This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch access-integration in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 7eb46c5dfcdcf8d1af2b04fbd3598ec64a31e044 Author: lahiruj <[email protected]> AuthorDate: Tue May 19 20:52:02 2026 -0400 Persist allocations, cluster accounts, and memberships in AMIE handlers --- .env.example | 13 ++- connectors/ACCESS/AMIE-Processor/connector.go | 14 ++- .../ACCESS/AMIE-Processor/handler/handler.go | 67 +++++++++++++ .../handler/request_account_create.go | 110 +++++++++++++++------ .../handler/request_project_create.go | 73 ++++++++++++-- connectors/ACCESS/AMIE-Processor/model/audit.go | 1 + 6 files changed, 234 insertions(+), 44 deletions(-) diff --git a/.env.example b/.env.example index 032242839..343b8ed86 100644 --- a/.env.example +++ b/.env.example @@ -10,9 +10,20 @@ # Local dev creds (admin/admin) come from dev-ops/compose/dbinit/init-db.sh. DATABASE_DSN='admin:admin@tcp(localhost:3306)/custos?parseTime=true&charset=utf8mb4&multiStatements=true' -# Optional. Listen address for the HTTP API.wa +# Optional. Listen address for the HTTP API. # HTTP_ADDR=:8080 # Optional. Database connection-pool sizing. # DB_MAX_OPEN_CONNS=25 # DB_MAX_IDLE_CONNS=5 + +# AMIE connector (ACCESS-AMIE). Required when the AMIE connector is enabled. +# AMIE_BASE_URL, AMIE_SITE_CODE, AMIE_API_KEY come from ACCESS coordination. +# AMIE_CLUSTER_ID — the compute_clusters.id this AMIE deployment writes to. +# One AMIE site is bound to one downstream cluster, so this is fixed per +# deployment. In dev, use the seed row id from +# dev-ops/compose/seeds/default_cluster.sql. +# AMIE_BASE_URL=https://amieclient.example.edu +# AMIE_SITE_CODE=TESTSITE +# AMIE_API_KEY=CHANGE_ME +# AMIE_CLUSTER_ID=00000000-0000-0000-0000-000000000001 diff --git a/connectors/ACCESS/AMIE-Processor/connector.go b/connectors/ACCESS/AMIE-Processor/connector.go index 7a79eb470..4b07322e6 100644 --- a/connectors/ACCESS/AMIE-Processor/connector.go +++ b/connectors/ACCESS/AMIE-Processor/connector.go @@ -64,16 +64,20 @@ func (amieConnector) Start(ctx context.Context, deps connectors.Deps) error { auditStore := store.NewAuditStore(deps.DB) auditSvc := service.NewAuditService(auditStore) - defaultOrgID := os.Getenv("AMIE_DEFAULT_ORG_ID") - if defaultOrgID == "" { - slog.Warn("AMIE_DEFAULT_ORG_ID not set; request_account_create and request_project_create will fail when creating new users") + // One AMIE site is tied to one downstream cluster by protocol, so the + // cluster id is a per-deployment config value rather than a per-packet + // lookup. Organizations are resolved per-packet from the *OrgCode and + // *Organization fields the AMIE packet carries. + clusterID := os.Getenv("AMIE_CLUSTER_ID") + if clusterID == "" { + slog.Warn("AMIE_CLUSTER_ID not set; request_project_create and request_account_create will fail when provisioning allocations/accounts") } amie := amieclient.New(cfg.AMIE) router := handler.NewRouter( - handler.NewRequestProjectCreateHandler(deps.Service, defaultOrgID, amie, auditSvc), - handler.NewRequestAccountCreateHandler(deps.Service, defaultOrgID, amie, auditSvc), + handler.NewRequestProjectCreateHandler(deps.Service, clusterID, amie, auditSvc), + handler.NewRequestAccountCreateHandler(deps.Service, clusterID, amie, auditSvc), handler.NewRequestProjectInactivateHandler(deps.Service, amie, auditSvc), handler.NewRequestProjectReactivateHandler(deps.Service, amie, auditSvc), handler.NewRequestAccountInactivateHandler(deps.Service, amie, auditSvc), diff --git a/connectors/ACCESS/AMIE-Processor/handler/handler.go b/connectors/ACCESS/AMIE-Processor/handler/handler.go index 3fccf2471..aca530b1c 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/handler.go +++ b/connectors/ACCESS/AMIE-Processor/handler/handler.go @@ -19,11 +19,18 @@ package handler import ( "context" + "crypto/rand" "database/sql" + "encoding/hex" + "errors" "fmt" + "strconv" "strings" + "time" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/pkg/models" + "github.com/apache/airavata-custos/pkg/service" ) type PacketHandler interface { @@ -104,6 +111,66 @@ func getResourceList(body map[string]any) []string { return result } +// ensureOrganization looks up an Organization by its originated_id (the +// AMIE-side org code such as "TEST123"); creates one if missing, using the +// human-readable organization name from the packet. +func ensureOrganization(ctx context.Context, svc *service.Service, code, name string) (*models.Organization, error) { + if code == "" { + return nil, fmt.Errorf("organization code is empty") + } + if org, err := svc.GetOrganizationByOriginatedID(ctx, code); err == nil { + return org, nil + } else if !errors.Is(err, service.ErrNotFound) { + return nil, err + } + if name == "" { + name = code + } + return svc.CreateOrganization(ctx, &models.Organization{ + OriginatedID: code, + Name: name, + }) +} + +// generateTempPosixUsername returns a placeholder posix username for a +// freshly provisioned ClusterAccount. +// +// TODO(amie-integration, username-policy): replace with a real policy +// (operator-configured prefix, deterministic mapping from UserGlobalID). +func generateTempPosixUsername() string { + var b [4]byte + _, _ = rand.Read(b[:]) + return "amie-" + hex.EncodeToString(b[:]) +} + +// getInt64 reads a string-encoded integer from a packet body field. AMIE +// transmits numeric fields like ServiceUnitsAllocated as JSON strings. +func getInt64(body map[string]any, key string) (int64, error) { + raw := getString(body, key) + if raw == "" { + return 0, fmt.Errorf("'%s' is empty", key) + } + n, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + return 0, fmt.Errorf("'%s' is not an integer: %w", key, err) + } + return n, nil +} + +// getDate reads a YYYY-MM-DD string from a packet body field. Returns the +// parsed time in UTC. +func getDate(body map[string]any, key string) (time.Time, error) { + raw := getString(body, key) + if raw == "" { + return time.Time{}, fmt.Errorf("'%s' is empty", key) + } + t, err := time.Parse("2006-01-02", raw) + if err != nil { + return time.Time{}, fmt.Errorf("'%s' is not a YYYY-MM-DD date: %w", key, err) + } + return t, nil +} + func getDNList(body map[string]any) []string { v, ok := body["DnList"] if !ok { diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go index a6135a052..3d228c62e 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go @@ -29,23 +29,23 @@ import ( ) type RequestAccountCreateHandler struct { - svc *service.Service - defaultOrgID string - amieClient AmieClient - auditSvc AuditService + svc *service.Service + clusterID string + amieClient AmieClient + auditSvc AuditService } -func NewRequestAccountCreateHandler(svc *service.Service, defaultOrgID string, amieClient AmieClient, auditSvc AuditService) *RequestAccountCreateHandler { - return &RequestAccountCreateHandler{svc: svc, defaultOrgID: defaultOrgID, amieClient: amieClient, auditSvc: auditSvc} +func NewRequestAccountCreateHandler(svc *service.Service, clusterID string, amieClient AmieClient, auditSvc AuditService) *RequestAccountCreateHandler { + return &RequestAccountCreateHandler{svc: svc, clusterID: clusterID, amieClient: amieClient, auditSvc: auditSvc} } func (h *RequestAccountCreateHandler) SupportsType() string { return "request_account_create" } -// Handle is partial. It ensures the User (with ExternalIdentity) and confirms -// the Project exists in core, then audits the membership request. Two -// operations are still TODO: -// - ClusterAccount provisioning — username-generation policy -// - ComputeAllocationMembership creation +// Handle ensures the User (with ExternalIdentity), looks up the Project (which +// must already exist from a prior request_project_create), provisions a +// ClusterAccount on the configured cluster, and attaches a +// ComputeAllocationMembership against the project's allocation. Replies with +// the assigned posix username. func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { body, err := getBody(packetJSON) if err != nil { @@ -62,6 +62,9 @@ func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, pa if err := requireText(userGlobalID, "UserGlobalID"); err != nil { return err } + if h.clusterID == "" { + return fmt.Errorf("AMIE_CLUSTER_ID not configured") + } user, err := h.ensureUser(ctx, body, userGlobalID) if err != nil { @@ -71,24 +74,35 @@ func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, pa return fmt.Errorf("request_account_create: audit CREATE_PERSON: %w", err) } - if _, err := h.svc.GetProjectByOriginatedID(ctx, projectOriginatedID); err != nil { - if !errors.Is(err, service.ErrNotFound) { - return fmt.Errorf("request_account_create: lookup project: %w", err) - } - // TODO(amie-integration): create the Project here when AMIE carries enough - // metadata to do so safely (title, PI, grant number). - // request_account_create assumes the project was created earlier via - // request_project_create. + project, err := h.svc.GetProjectByOriginatedID(ctx, projectOriginatedID) + if err != nil { + return fmt.Errorf("request_account_create: project %q not found (request_project_create must precede this packet): %w", projectOriginatedID, err) + } + + allocations, err := h.svc.ListComputeAllocationsByProject(ctx, project.ID) + if err != nil { + return fmt.Errorf("request_account_create: list allocations: %w", err) + } + if len(allocations) == 0 { + return fmt.Errorf("request_account_create: project %q has no ComputeAllocation; request_project_create did not provision one", projectOriginatedID) } + allocation := allocations[0] - // TODO(amie-integration): provision a ClusterAccount via svc.CreateClusterAccount - // once a username-generation policy is in place. + account, err := h.ensureClusterAccount(ctx, user.ID) + if err != nil { + return fmt.Errorf("request_account_create: ensure cluster account: %w", err) + } + if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, model.AuditCreateAccount, "cluster_account", account.ID, account.Username); err != nil { + return fmt.Errorf("request_account_create: audit CREATE_ACCOUNT: %w", err) + } - // TODO(amie-integration): create a ComputeAllocationMembership via - // svc.CreateComputeAllocationMembership once we have a ComputeAllocation - // under this Project to attach to. role := normalizeRole(getString(body, "UserRole")) - if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, model.AuditCreateMembership, "membership_request", "", fmt.Sprintf("project=%s user=%s role=%s (membership persistence pending allocation mapping)", projectOriginatedID, user.ID, role)); err != nil { + membership, err := h.ensureMembership(ctx, allocation.ID, user.ID) + if err != nil { + return fmt.Errorf("request_account_create: ensure membership: %w", err) + } + if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, model.AuditCreateMembership, "compute_allocation_membership", membership.ID, + fmt.Sprintf("allocation=%s user=%s role=%s", allocation.ID, user.ID, role)); err != nil { return fmt.Errorf("request_account_create: audit CREATE_MEMBERSHIP: %w", err) } @@ -96,7 +110,7 @@ func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, pa "ProjectID": projectOriginatedID, "GrantNumber": getString(body, "GrantNumber"), "UserPersonID": user.ID, - "UserRemoteSiteLogin": getString(body, "UserGlobalID"), + "UserRemoteSiteLogin": account.Username, "ResourceList": getResourceList(body), } if v := getString(body, "UserOrgCode"); v != "" { @@ -119,11 +133,12 @@ func (h *RequestAccountCreateHandler) ensureUser(ctx context.Context, body map[s return nil, err } - if h.defaultOrgID == "" { - return nil, fmt.Errorf("cannot create user: AMIE_DEFAULT_ORG_ID not configured") + org, err := ensureOrganization(ctx, h.svc, getString(body, "UserOrgCode"), getString(body, "UserOrganization")) + if err != nil { + return nil, fmt.Errorf("ensure user organization: %w", err) } user, err := h.svc.CreateUser(ctx, &models.User{ - OrganizationID: h.defaultOrgID, + OrganizationID: org.ID, FirstName: getString(body, "UserFirstName"), LastName: getString(body, "UserLastName"), Email: getString(body, "UserEmail"), @@ -140,3 +155,40 @@ func (h *RequestAccountCreateHandler) ensureUser(ctx context.Context, body map[s } return user, nil } + +// ensureClusterAccount returns the user's existing cluster account on the +// configured cluster, or provisions a fresh one with a temp posix username. +func (h *RequestAccountCreateHandler) ensureClusterAccount(ctx context.Context, userID string) (*models.ClusterAccount, error) { + existing, err := h.svc.ListClusterAccountsForUser(ctx, userID) + if err != nil { + return nil, fmt.Errorf("list cluster accounts: %w", err) + } + for _, a := range existing { + if a.ComputeClusterID == h.clusterID { + return &a, nil + } + } + return h.svc.CreateClusterAccount(ctx, &models.ClusterAccount{ + UserID: userID, + ComputeClusterID: h.clusterID, + Username: generateTempPosixUsername(), + }) +} + +// ensureMembership returns the existing (allocation, user) membership or +// creates a new one. Idempotent for re-delivered packets. +func (h *RequestAccountCreateHandler) ensureMembership(ctx context.Context, allocationID, userID string) (*models.ComputeAllocationMembership, error) { + existing, err := h.svc.ListMembersForAllocation(ctx, allocationID) + if err != nil { + return nil, fmt.Errorf("list memberships: %w", err) + } + for _, m := range existing { + if m.UserID == userID { + return &m, nil + } + } + return h.svc.CreateComputeAllocationMembership(ctx, &models.ComputeAllocationMembership{ + ComputeAllocationID: allocationID, + UserID: userID, + }) +} diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go index 21896a205..33a06dff5 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go @@ -29,18 +29,22 @@ import ( ) type RequestProjectCreateHandler struct { - svc *service.Service - defaultOrgID string - amieClient AmieClient - auditSvc AuditService + svc *service.Service + clusterID string + amieClient AmieClient + auditSvc AuditService } -func NewRequestProjectCreateHandler(svc *service.Service, defaultOrgID string, amieClient AmieClient, auditSvc AuditService) *RequestProjectCreateHandler { - return &RequestProjectCreateHandler{svc: svc, defaultOrgID: defaultOrgID, amieClient: amieClient, auditSvc: auditSvc} +func NewRequestProjectCreateHandler(svc *service.Service, clusterID string, amieClient AmieClient, auditSvc AuditService) *RequestProjectCreateHandler { + return &RequestProjectCreateHandler{svc: svc, clusterID: clusterID, amieClient: amieClient, auditSvc: auditSvc} } func (h *RequestProjectCreateHandler) SupportsType() string { return "request_project_create" } +// Handle ensures the PI user (resolving the organization from PiOrgCode + +// PiOrganization), creates (or finds) the Project, and creates a +// ComputeAllocation populated from the packet body's ServiceUnitsAllocated, +// StartDate and EndDate. func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { body, err := getBody(packetJSON) if err != nil { @@ -54,6 +58,9 @@ func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, pa if err := requireText(piGlobalID, "PiGlobalID"); err != nil { return err } + if h.clusterID == "" { + return fmt.Errorf("AMIE_CLUSTER_ID not configured") + } // AMIE protocol: request_project_create does not carry a ProjectID. The // receiving site assigns one. We use the GrantNumber as the originated_id // since it is the stable cross-site identifier on the AMIE side. @@ -75,6 +82,14 @@ func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, pa return fmt.Errorf("request_project_create: audit CREATE_PROJECT: %w", err) } + allocation, err := h.ensureAllocation(ctx, body, project.ID, grantNumber) + if err != nil { + return fmt.Errorf("request_project_create: ensure allocation: %w", err) + } + if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, model.AuditCreateAllocation, "compute_allocation", allocation.ID, ""); err != nil { + return fmt.Errorf("request_project_create: audit CREATE_ALLOCATION: %w", err) + } + replyBody := map[string]any{ "ProjectID": project.ID, "GrantNumber": grantNumber, @@ -98,11 +113,12 @@ func (h *RequestProjectCreateHandler) ensurePIUser(ctx context.Context, body map } else if !errors.Is(err, service.ErrNotFound) { return nil, err } - if h.defaultOrgID == "" { - return nil, fmt.Errorf("cannot create PI user: AMIE_DEFAULT_ORG_ID not configured") + org, err := ensureOrganization(ctx, h.svc, getString(body, "PiOrgCode"), getString(body, "PiOrganization")) + if err != nil { + return nil, fmt.Errorf("ensure PI organization: %w", err) } user, err := h.svc.CreateUser(ctx, &models.User{ - OrganizationID: h.defaultOrgID, + OrganizationID: org.ID, FirstName: getString(body, "PiFirstName"), LastName: getString(body, "PiLastName"), Email: getString(body, "PiEmail"), @@ -133,3 +149,42 @@ func (h *RequestProjectCreateHandler) ensureProject(ctx context.Context, origina ProjectPIID: piID, }) } + +// ensureAllocation creates a ComputeAllocation for the project if none exists +// yet. If one already exists (e.g. a repeat request_project_create signaling a +// supplement/renewal), the existing row is returned unchanged. +// +// TODO(amie-integration, allocation-type): branch on body["AllocationType"] +// (new / renewal / supplement / extension) and adjust the allocation +// accordingly. +func (h *RequestProjectCreateHandler) ensureAllocation(ctx context.Context, body map[string]any, projectID, grantNumber string) (*models.ComputeAllocation, error) { + existing, err := h.svc.ListComputeAllocationsByProject(ctx, projectID) + if err != nil { + return nil, fmt.Errorf("list allocations: %w", err) + } + if len(existing) > 0 { + return &existing[0], nil + } + + su, err := getInt64(body, "ServiceUnitsAllocated") + if err != nil { + return nil, err + } + start, err := getDate(body, "StartDate") + if err != nil { + return nil, err + } + end, err := getDate(body, "EndDate") + if err != nil { + return nil, err + } + + return h.svc.CreateComputeAllocation(ctx, &models.ComputeAllocation{ + ProjectID: projectID, + Name: grantNumber, + ComputeClusterID: h.clusterID, + InitialSUAmount: su, + StartTime: start, + EndTime: end, + }) +} diff --git a/connectors/ACCESS/AMIE-Processor/model/audit.go b/connectors/ACCESS/AMIE-Processor/model/audit.go index db80773a2..d6e1182ef 100644 --- a/connectors/ACCESS/AMIE-Processor/model/audit.go +++ b/connectors/ACCESS/AMIE-Processor/model/audit.go @@ -29,6 +29,7 @@ const ( AuditMergePersons AuditAction = "MERGE_PERSONS" AuditCreateAccount AuditAction = "CREATE_ACCOUNT" AuditCreateProject AuditAction = "CREATE_PROJECT" + AuditCreateAllocation AuditAction = "CREATE_ALLOCATION" AuditInactivateProject AuditAction = "INACTIVATE_PROJECT" AuditReactivateProject AuditAction = "REACTIVATE_PROJECT" AuditCreateMembership AuditAction = "CREATE_MEMBERSHIP"
