This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch access-integration
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 7eb46c5dfcdcf8d1af2b04fbd3598ec64a31e044
Author: lahiruj <[email protected]>
AuthorDate: Tue May 19 20:52:02 2026 -0400

    Persist allocations, cluster accounts, and memberships in AMIE handlers
---
 .env.example                                       |  13 ++-
 connectors/ACCESS/AMIE-Processor/connector.go      |  14 ++-
 .../ACCESS/AMIE-Processor/handler/handler.go       |  67 +++++++++++++
 .../handler/request_account_create.go              | 110 +++++++++++++++------
 .../handler/request_project_create.go              |  73 ++++++++++++--
 connectors/ACCESS/AMIE-Processor/model/audit.go    |   1 +
 6 files changed, 234 insertions(+), 44 deletions(-)

diff --git a/.env.example b/.env.example
index 032242839..343b8ed86 100644
--- a/.env.example
+++ b/.env.example
@@ -10,9 +10,20 @@
 # Local dev creds (admin/admin) come from dev-ops/compose/dbinit/init-db.sh.
 
DATABASE_DSN='admin:admin@tcp(localhost:3306)/custos?parseTime=true&charset=utf8mb4&multiStatements=true'
 
-# Optional. Listen address for the HTTP API.wa
+# Optional. Listen address for the HTTP API.
 # HTTP_ADDR=:8080
 
 # Optional. Database connection-pool sizing.
 # DB_MAX_OPEN_CONNS=25
 # DB_MAX_IDLE_CONNS=5
+
+# AMIE connector (ACCESS-AMIE). Required when the AMIE connector is enabled.
+# AMIE_BASE_URL, AMIE_SITE_CODE, AMIE_API_KEY come from ACCESS coordination.
+# AMIE_CLUSTER_ID — the compute_clusters.id this AMIE deployment writes to.
+# One AMIE site is bound to one downstream cluster, so this is fixed per
+# deployment. In dev, use the seed row id from
+# dev-ops/compose/seeds/default_cluster.sql.
+# AMIE_BASE_URL=https://amieclient.example.edu
+# AMIE_SITE_CODE=TESTSITE
+# AMIE_API_KEY=CHANGE_ME
+# AMIE_CLUSTER_ID=00000000-0000-0000-0000-000000000001
diff --git a/connectors/ACCESS/AMIE-Processor/connector.go 
b/connectors/ACCESS/AMIE-Processor/connector.go
index 7a79eb470..4b07322e6 100644
--- a/connectors/ACCESS/AMIE-Processor/connector.go
+++ b/connectors/ACCESS/AMIE-Processor/connector.go
@@ -64,16 +64,20 @@ func (amieConnector) Start(ctx context.Context, deps 
connectors.Deps) error {
        auditStore := store.NewAuditStore(deps.DB)
        auditSvc := service.NewAuditService(auditStore)
 
-       defaultOrgID := os.Getenv("AMIE_DEFAULT_ORG_ID")
-       if defaultOrgID == "" {
-               slog.Warn("AMIE_DEFAULT_ORG_ID not set; request_account_create 
and request_project_create will fail when creating new users")
+       // One AMIE site is tied to one downstream cluster by protocol, so the
+       // cluster id is a per-deployment config value rather than a per-packet
+       // lookup. Organizations are resolved per-packet from the *OrgCode and
+       // *Organization fields the AMIE packet carries.
+       clusterID := os.Getenv("AMIE_CLUSTER_ID")
+       if clusterID == "" {
+               slog.Warn("AMIE_CLUSTER_ID not set; request_project_create and 
request_account_create will fail when provisioning allocations/accounts")
        }
 
        amie := amieclient.New(cfg.AMIE)
 
        router := handler.NewRouter(
-               handler.NewRequestProjectCreateHandler(deps.Service, 
defaultOrgID, amie, auditSvc),
-               handler.NewRequestAccountCreateHandler(deps.Service, 
defaultOrgID, amie, auditSvc),
+               handler.NewRequestProjectCreateHandler(deps.Service, clusterID, 
amie, auditSvc),
+               handler.NewRequestAccountCreateHandler(deps.Service, clusterID, 
amie, auditSvc),
                handler.NewRequestProjectInactivateHandler(deps.Service, amie, 
auditSvc),
                handler.NewRequestProjectReactivateHandler(deps.Service, amie, 
auditSvc),
                handler.NewRequestAccountInactivateHandler(deps.Service, amie, 
auditSvc),
diff --git a/connectors/ACCESS/AMIE-Processor/handler/handler.go 
b/connectors/ACCESS/AMIE-Processor/handler/handler.go
index 3fccf2471..aca530b1c 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/handler.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/handler.go
@@ -19,11 +19,18 @@ package handler
 
 import (
        "context"
+       "crypto/rand"
        "database/sql"
+       "encoding/hex"
+       "errors"
        "fmt"
+       "strconv"
        "strings"
+       "time"
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/pkg/models"
+       "github.com/apache/airavata-custos/pkg/service"
 )
 
 type PacketHandler interface {
@@ -104,6 +111,66 @@ func getResourceList(body map[string]any) []string {
        return result
 }
 
+// ensureOrganization looks up an Organization by its originated_id (the
+// AMIE-side org code such as "TEST123"); creates one if missing, using the
+// human-readable organization name from the packet.
+func ensureOrganization(ctx context.Context, svc *service.Service, code, name 
string) (*models.Organization, error) {
+       if code == "" {
+               return nil, fmt.Errorf("organization code is empty")
+       }
+       if org, err := svc.GetOrganizationByOriginatedID(ctx, code); err == nil 
{
+               return org, nil
+       } else if !errors.Is(err, service.ErrNotFound) {
+               return nil, err
+       }
+       if name == "" {
+               name = code
+       }
+       return svc.CreateOrganization(ctx, &models.Organization{
+               OriginatedID: code,
+               Name:         name,
+       })
+}
+
+// generateTempPosixUsername returns a placeholder posix username for a
+// freshly provisioned ClusterAccount.
+//
+// TODO(amie-integration, username-policy): replace with a real policy
+// (operator-configured prefix, deterministic mapping from UserGlobalID).
+func generateTempPosixUsername() string {
+       var b [4]byte
+       _, _ = rand.Read(b[:])
+       return "amie-" + hex.EncodeToString(b[:])
+}
+
+// getInt64 reads a string-encoded integer from a packet body field. AMIE
+// transmits numeric fields like ServiceUnitsAllocated as JSON strings.
+func getInt64(body map[string]any, key string) (int64, error) {
+       raw := getString(body, key)
+       if raw == "" {
+               return 0, fmt.Errorf("'%s' is empty", key)
+       }
+       n, err := strconv.ParseInt(raw, 10, 64)
+       if err != nil {
+               return 0, fmt.Errorf("'%s' is not an integer: %w", key, err)
+       }
+       return n, nil
+}
+
+// getDate reads a YYYY-MM-DD string from a packet body field. Returns the
+// parsed time in UTC.
+func getDate(body map[string]any, key string) (time.Time, error) {
+       raw := getString(body, key)
+       if raw == "" {
+               return time.Time{}, fmt.Errorf("'%s' is empty", key)
+       }
+       t, err := time.Parse("2006-01-02", raw)
+       if err != nil {
+               return time.Time{}, fmt.Errorf("'%s' is not a YYYY-MM-DD date: 
%w", key, err)
+       }
+       return t, nil
+}
+
 func getDNList(body map[string]any) []string {
        v, ok := body["DnList"]
        if !ok {
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
index a6135a052..3d228c62e 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
@@ -29,23 +29,23 @@ import (
 )
 
 type RequestAccountCreateHandler struct {
-       svc          *service.Service
-       defaultOrgID string
-       amieClient   AmieClient
-       auditSvc     AuditService
+       svc        *service.Service
+       clusterID  string
+       amieClient AmieClient
+       auditSvc   AuditService
 }
 
-func NewRequestAccountCreateHandler(svc *service.Service, defaultOrgID string, 
amieClient AmieClient, auditSvc AuditService) *RequestAccountCreateHandler {
-       return &RequestAccountCreateHandler{svc: svc, defaultOrgID: 
defaultOrgID, amieClient: amieClient, auditSvc: auditSvc}
+func NewRequestAccountCreateHandler(svc *service.Service, clusterID string, 
amieClient AmieClient, auditSvc AuditService) *RequestAccountCreateHandler {
+       return &RequestAccountCreateHandler{svc: svc, clusterID: clusterID, 
amieClient: amieClient, auditSvc: auditSvc}
 }
 
 func (h *RequestAccountCreateHandler) SupportsType() string { return 
"request_account_create" }
 
-// Handle is partial. It ensures the User (with ExternalIdentity) and confirms
-// the Project exists in core, then audits the membership request. Two
-// operations are still TODO:
-//   - ClusterAccount provisioning — username-generation policy
-//   - ComputeAllocationMembership creation
+// Handle ensures the User (with ExternalIdentity), looks up the Project (which
+// must already exist from a prior request_project_create), provisions a
+// ClusterAccount on the configured cluster, and attaches a
+// ComputeAllocationMembership against the project's allocation. Replies with
+// the assigned posix username.
 func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
        body, err := getBody(packetJSON)
        if err != nil {
@@ -62,6 +62,9 @@ func (h *RequestAccountCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
        if err := requireText(userGlobalID, "UserGlobalID"); err != nil {
                return err
        }
+       if h.clusterID == "" {
+               return fmt.Errorf("AMIE_CLUSTER_ID not configured")
+       }
 
        user, err := h.ensureUser(ctx, body, userGlobalID)
        if err != nil {
@@ -71,24 +74,35 @@ func (h *RequestAccountCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
                return fmt.Errorf("request_account_create: audit CREATE_PERSON: 
%w", err)
        }
 
-       if _, err := h.svc.GetProjectByOriginatedID(ctx, projectOriginatedID); 
err != nil {
-               if !errors.Is(err, service.ErrNotFound) {
-                       return fmt.Errorf("request_account_create: lookup 
project: %w", err)
-               }
-               // TODO(amie-integration): create the Project here when AMIE 
carries enough
-               // metadata to do so safely (title, PI, grant number).
-               // request_account_create assumes the project was created 
earlier via
-               // request_project_create.
+       project, err := h.svc.GetProjectByOriginatedID(ctx, projectOriginatedID)
+       if err != nil {
+               return fmt.Errorf("request_account_create: project %q not found 
(request_project_create must precede this packet): %w", projectOriginatedID, 
err)
+       }
+
+       allocations, err := h.svc.ListComputeAllocationsByProject(ctx, 
project.ID)
+       if err != nil {
+               return fmt.Errorf("request_account_create: list allocations: 
%w", err)
+       }
+       if len(allocations) == 0 {
+               return fmt.Errorf("request_account_create: project %q has no 
ComputeAllocation; request_project_create did not provision one", 
projectOriginatedID)
        }
+       allocation := allocations[0]
 
-       // TODO(amie-integration): provision a ClusterAccount via 
svc.CreateClusterAccount
-       // once a username-generation policy is in place.
+       account, err := h.ensureClusterAccount(ctx, user.ID)
+       if err != nil {
+               return fmt.Errorf("request_account_create: ensure cluster 
account: %w", err)
+       }
+       if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
model.AuditCreateAccount, "cluster_account", account.ID, account.Username); err 
!= nil {
+               return fmt.Errorf("request_account_create: audit 
CREATE_ACCOUNT: %w", err)
+       }
 
-       // TODO(amie-integration): create a ComputeAllocationMembership via
-       // svc.CreateComputeAllocationMembership once we have a 
ComputeAllocation
-       // under this Project to attach to.
        role := normalizeRole(getString(body, "UserRole"))
-       if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
model.AuditCreateMembership, "membership_request", "", fmt.Sprintf("project=%s 
user=%s role=%s (membership persistence pending allocation mapping)", 
projectOriginatedID, user.ID, role)); err != nil {
+       membership, err := h.ensureMembership(ctx, allocation.ID, user.ID)
+       if err != nil {
+               return fmt.Errorf("request_account_create: ensure membership: 
%w", err)
+       }
+       if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
model.AuditCreateMembership, "compute_allocation_membership", membership.ID,
+               fmt.Sprintf("allocation=%s user=%s role=%s", allocation.ID, 
user.ID, role)); err != nil {
                return fmt.Errorf("request_account_create: audit 
CREATE_MEMBERSHIP: %w", err)
        }
 
@@ -96,7 +110,7 @@ func (h *RequestAccountCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
                "ProjectID":           projectOriginatedID,
                "GrantNumber":         getString(body, "GrantNumber"),
                "UserPersonID":        user.ID,
-               "UserRemoteSiteLogin": getString(body, "UserGlobalID"),
+               "UserRemoteSiteLogin": account.Username,
                "ResourceList":        getResourceList(body),
        }
        if v := getString(body, "UserOrgCode"); v != "" {
@@ -119,11 +133,12 @@ func (h *RequestAccountCreateHandler) ensureUser(ctx 
context.Context, body map[s
                return nil, err
        }
 
-       if h.defaultOrgID == "" {
-               return nil, fmt.Errorf("cannot create user: AMIE_DEFAULT_ORG_ID 
not configured")
+       org, err := ensureOrganization(ctx, h.svc, getString(body, 
"UserOrgCode"), getString(body, "UserOrganization"))
+       if err != nil {
+               return nil, fmt.Errorf("ensure user organization: %w", err)
        }
        user, err := h.svc.CreateUser(ctx, &models.User{
-               OrganizationID: h.defaultOrgID,
+               OrganizationID: org.ID,
                FirstName:      getString(body, "UserFirstName"),
                LastName:       getString(body, "UserLastName"),
                Email:          getString(body, "UserEmail"),
@@ -140,3 +155,40 @@ func (h *RequestAccountCreateHandler) ensureUser(ctx 
context.Context, body map[s
        }
        return user, nil
 }
+
+// ensureClusterAccount returns the user's existing cluster account on the
+// configured cluster, or provisions a fresh one with a temp posix username.
+func (h *RequestAccountCreateHandler) ensureClusterAccount(ctx 
context.Context, userID string) (*models.ClusterAccount, error) {
+       existing, err := h.svc.ListClusterAccountsForUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("list cluster accounts: %w", err)
+       }
+       for _, a := range existing {
+               if a.ComputeClusterID == h.clusterID {
+                       return &a, nil
+               }
+       }
+       return h.svc.CreateClusterAccount(ctx, &models.ClusterAccount{
+               UserID:           userID,
+               ComputeClusterID: h.clusterID,
+               Username:         generateTempPosixUsername(),
+       })
+}
+
+// ensureMembership returns the existing (allocation, user) membership or
+// creates a new one. Idempotent for re-delivered packets.
+func (h *RequestAccountCreateHandler) ensureMembership(ctx context.Context, 
allocationID, userID string) (*models.ComputeAllocationMembership, error) {
+       existing, err := h.svc.ListMembersForAllocation(ctx, allocationID)
+       if err != nil {
+               return nil, fmt.Errorf("list memberships: %w", err)
+       }
+       for _, m := range existing {
+               if m.UserID == userID {
+                       return &m, nil
+               }
+       }
+       return h.svc.CreateComputeAllocationMembership(ctx, 
&models.ComputeAllocationMembership{
+               ComputeAllocationID: allocationID,
+               UserID:              userID,
+       })
+}
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
index 21896a205..33a06dff5 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
@@ -29,18 +29,22 @@ import (
 )
 
 type RequestProjectCreateHandler struct {
-       svc          *service.Service
-       defaultOrgID string
-       amieClient   AmieClient
-       auditSvc     AuditService
+       svc        *service.Service
+       clusterID  string
+       amieClient AmieClient
+       auditSvc   AuditService
 }
 
-func NewRequestProjectCreateHandler(svc *service.Service, defaultOrgID string, 
amieClient AmieClient, auditSvc AuditService) *RequestProjectCreateHandler {
-       return &RequestProjectCreateHandler{svc: svc, defaultOrgID: 
defaultOrgID, amieClient: amieClient, auditSvc: auditSvc}
+func NewRequestProjectCreateHandler(svc *service.Service, clusterID string, 
amieClient AmieClient, auditSvc AuditService) *RequestProjectCreateHandler {
+       return &RequestProjectCreateHandler{svc: svc, clusterID: clusterID, 
amieClient: amieClient, auditSvc: auditSvc}
 }
 
 func (h *RequestProjectCreateHandler) SupportsType() string { return 
"request_project_create" }
 
+// Handle ensures the PI user (resolving the organization from PiOrgCode +
+// PiOrganization), creates (or finds) the Project, and creates a
+// ComputeAllocation populated from the packet body's ServiceUnitsAllocated,
+// StartDate and EndDate.
 func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
        body, err := getBody(packetJSON)
        if err != nil {
@@ -54,6 +58,9 @@ func (h *RequestProjectCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
        if err := requireText(piGlobalID, "PiGlobalID"); err != nil {
                return err
        }
+       if h.clusterID == "" {
+               return fmt.Errorf("AMIE_CLUSTER_ID not configured")
+       }
        // AMIE protocol: request_project_create does not carry a ProjectID. The
        // receiving site assigns one. We use the GrantNumber as the 
originated_id
        // since it is the stable cross-site identifier on the AMIE side.
@@ -75,6 +82,14 @@ func (h *RequestProjectCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
                return fmt.Errorf("request_project_create: audit 
CREATE_PROJECT: %w", err)
        }
 
+       allocation, err := h.ensureAllocation(ctx, body, project.ID, 
grantNumber)
+       if err != nil {
+               return fmt.Errorf("request_project_create: ensure allocation: 
%w", err)
+       }
+       if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
model.AuditCreateAllocation, "compute_allocation", allocation.ID, ""); err != 
nil {
+               return fmt.Errorf("request_project_create: audit 
CREATE_ALLOCATION: %w", err)
+       }
+
        replyBody := map[string]any{
                "ProjectID":         project.ID,
                "GrantNumber":       grantNumber,
@@ -98,11 +113,12 @@ func (h *RequestProjectCreateHandler) ensurePIUser(ctx 
context.Context, body map
        } else if !errors.Is(err, service.ErrNotFound) {
                return nil, err
        }
-       if h.defaultOrgID == "" {
-               return nil, fmt.Errorf("cannot create PI user: 
AMIE_DEFAULT_ORG_ID not configured")
+       org, err := ensureOrganization(ctx, h.svc, getString(body, 
"PiOrgCode"), getString(body, "PiOrganization"))
+       if err != nil {
+               return nil, fmt.Errorf("ensure PI organization: %w", err)
        }
        user, err := h.svc.CreateUser(ctx, &models.User{
-               OrganizationID: h.defaultOrgID,
+               OrganizationID: org.ID,
                FirstName:      getString(body, "PiFirstName"),
                LastName:       getString(body, "PiLastName"),
                Email:          getString(body, "PiEmail"),
@@ -133,3 +149,42 @@ func (h *RequestProjectCreateHandler) ensureProject(ctx 
context.Context, origina
                ProjectPIID:  piID,
        })
 }
+
+// ensureAllocation creates a ComputeAllocation for the project if none exists
+// yet. If one already exists (e.g. a repeat request_project_create signaling a
+// supplement/renewal), the existing row is returned unchanged.
+//
+// TODO(amie-integration, allocation-type): branch on body["AllocationType"]
+// (new / renewal / supplement / extension) and adjust the allocation
+// accordingly.
+func (h *RequestProjectCreateHandler) ensureAllocation(ctx context.Context, 
body map[string]any, projectID, grantNumber string) (*models.ComputeAllocation, 
error) {
+       existing, err := h.svc.ListComputeAllocationsByProject(ctx, projectID)
+       if err != nil {
+               return nil, fmt.Errorf("list allocations: %w", err)
+       }
+       if len(existing) > 0 {
+               return &existing[0], nil
+       }
+
+       su, err := getInt64(body, "ServiceUnitsAllocated")
+       if err != nil {
+               return nil, err
+       }
+       start, err := getDate(body, "StartDate")
+       if err != nil {
+               return nil, err
+       }
+       end, err := getDate(body, "EndDate")
+       if err != nil {
+               return nil, err
+       }
+
+       return h.svc.CreateComputeAllocation(ctx, &models.ComputeAllocation{
+               ProjectID:        projectID,
+               Name:             grantNumber,
+               ComputeClusterID: h.clusterID,
+               InitialSUAmount:  su,
+               StartTime:        start,
+               EndTime:          end,
+       })
+}
diff --git a/connectors/ACCESS/AMIE-Processor/model/audit.go 
b/connectors/ACCESS/AMIE-Processor/model/audit.go
index db80773a2..d6e1182ef 100644
--- a/connectors/ACCESS/AMIE-Processor/model/audit.go
+++ b/connectors/ACCESS/AMIE-Processor/model/audit.go
@@ -29,6 +29,7 @@ const (
        AuditMergePersons         AuditAction = "MERGE_PERSONS"
        AuditCreateAccount        AuditAction = "CREATE_ACCOUNT"
        AuditCreateProject        AuditAction = "CREATE_PROJECT"
+       AuditCreateAllocation     AuditAction = "CREATE_ALLOCATION"
        AuditInactivateProject    AuditAction = "INACTIVATE_PROJECT"
        AuditReactivateProject    AuditAction = "REACTIVATE_PROJECT"
        AuditCreateMembership     AuditAction = "CREATE_MEMBERSHIP"

Reply via email to