This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 231acfeb4c8862b88d7f36e68e5ab3f4ed7740a7 Author: lahiruj <[email protected]> AuthorDate: Mon May 18 17:08:26 2026 -0400 Add ExternalIdentity and user_dns to core --- ...00010_external_identities_and_user_dns.down.sql | 19 +++ .../000010_external_identities_and_user_dns.up.sql | 45 ++++++ internal/server/server.go | 137 ++++++++++++++++++ internal/store/external_identity_store.go | 112 +++++++++++++++ internal/store/store.go | 41 ++++++ internal/store/user_dn_store.go | 88 ++++++++++++ pkg/events/external_identity_subscribe.go | 46 ++++++ pkg/events/types.go | 13 ++ pkg/events/user_dn_subscribe.go | 40 ++++++ pkg/models/identity.go | 27 ++++ pkg/service/external_identity.go | 159 +++++++++++++++++++++ pkg/service/service.go | 8 ++ pkg/service/user_dn.go | 124 ++++++++++++++++ 13 files changed, 859 insertions(+) diff --git a/internal/db/migrations/000010_external_identities_and_user_dns.down.sql b/internal/db/migrations/000010_external_identities_and_user_dns.down.sql new file mode 100644 index 000000000..88dae2ca8 --- /dev/null +++ b/internal/db/migrations/000010_external_identities_and_user_dns.down.sql @@ -0,0 +1,19 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS user_dns; +DROP TABLE IF EXISTS external_identities; diff --git a/internal/db/migrations/000010_external_identities_and_user_dns.up.sql b/internal/db/migrations/000010_external_identities_and_user_dns.up.sql new file mode 100644 index 000000000..375ae7e0e --- /dev/null +++ b/internal/db/migrations/000010_external_identities_and_user_dns.up.sql @@ -0,0 +1,45 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE IF NOT EXISTS external_identities +( + id VARCHAR(255) NOT NULL, + user_id VARCHAR(255) NOT NULL, + source VARCHAR(64) NOT NULL, + external_id VARCHAR(255) NOT NULL, + oidc_sub VARCHAR(255) NOT NULL DEFAULT '', + metadata TEXT NOT NULL, + created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_external_identities_source_external (source, external_id), + KEY idx_external_identities_user (user_id), + KEY idx_external_identities_oidc_sub (oidc_sub), + CONSTRAINT fk_external_identities_user FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS user_dns +( + id VARCHAR(255) NOT NULL, + user_id VARCHAR(255) NOT NULL, + dn VARCHAR(512) NOT NULL, + created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_user_dns_user_dn (user_id, dn), + KEY idx_user_dns_dn (dn), + CONSTRAINT fk_user_dns_user FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; diff --git a/internal/server/server.go b/internal/server/server.go index fb98b7320..56b6a6bd7 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -116,6 +116,20 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /compute-allocations/{id}/usages/total", s.getTotalSUUsageForAllocation) s.mux.HandleFunc("GET /compute-allocations/{id}/users/{userId}/usages/total", s.getTotalSUUsageForUserInAllocation) s.mux.HandleFunc("GET /users/{id}/compute-allocation-usages", s.listUsagesByUser) + + s.mux.HandleFunc("POST /external-identities", s.createExternalIdentity) + s.mux.HandleFunc("GET /external-identities/{id}", s.getExternalIdentity) + s.mux.HandleFunc("PUT /external-identities/{id}", s.updateExternalIdentity) + s.mux.HandleFunc("DELETE /external-identities/{id}", s.deleteExternalIdentity) + s.mux.HandleFunc("GET /external-identities/by-source/{source}/{externalId}", s.getExternalIdentityBySource) + s.mux.HandleFunc("GET /external-identities/by-oidc-sub/{sub}", s.getExternalIdentityByOIDCSub) + s.mux.HandleFunc("GET /users/{id}/external-identities", s.listExternalIdentitiesForUser) + + s.mux.HandleFunc("POST /users/{id}/dns", s.addUserDN) + s.mux.HandleFunc("GET /user-dns/{id}", s.getUserDN) + s.mux.HandleFunc("DELETE /user-dns/{id}", s.removeUserDN) + s.mux.HandleFunc("GET /users/{id}/dns", s.listUserDNs) + s.mux.HandleFunc("GET /user-dns/by-dn", s.getUserDNByDN) } func (s *Server) healthz(w http.ResponseWriter, _ *http.Request) { @@ -706,6 +720,129 @@ func (s *Server) getTotalSUUsageForUserInAllocation(w http.ResponseWriter, r *ht }) } +func (s *Server) createExternalIdentity(w http.ResponseWriter, r *http.Request) { + var e models.ExternalIdentity + if err := decodeJSON(r, &e); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + created, err := s.svc.CreateExternalIdentity(r.Context(), &e) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) getExternalIdentity(w http.ResponseWriter, r *http.Request) { + e, err := s.svc.GetExternalIdentity(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, e) +} + +func (s *Server) updateExternalIdentity(w http.ResponseWriter, r *http.Request) { + var e models.ExternalIdentity + if err := decodeJSON(r, &e); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + e.ID = r.PathValue("id") + if err := s.svc.UpdateExternalIdentity(r.Context(), &e); err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, &e) +} + +func (s *Server) deleteExternalIdentity(w http.ResponseWriter, r *http.Request) { + if err := s.svc.DeleteExternalIdentity(r.Context(), r.PathValue("id")); err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusNoContent, nil) +} + +func (s *Server) getExternalIdentityBySource(w http.ResponseWriter, r *http.Request) { + e, err := s.svc.GetExternalIdentityBySourceAndExternalID(r.Context(), r.PathValue("source"), r.PathValue("externalId")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, e) +} + +func (s *Server) getExternalIdentityByOIDCSub(w http.ResponseWriter, r *http.Request) { + e, err := s.svc.GetExternalIdentityByOIDCSub(r.Context(), r.PathValue("sub")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, e) +} + +func (s *Server) listExternalIdentitiesForUser(w http.ResponseWriter, r *http.Request) { + out, err := s.svc.ListExternalIdentitiesForUser(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + +func (s *Server) addUserDN(w http.ResponseWriter, r *http.Request) { + var d models.UserDN + if err := decodeJSON(r, &d); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + d.UserID = r.PathValue("id") + created, err := s.svc.AddUserDN(r.Context(), &d) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) getUserDN(w http.ResponseWriter, r *http.Request) { + d, err := s.svc.GetUserDN(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, d) +} + +func (s *Server) removeUserDN(w http.ResponseWriter, r *http.Request) { + if err := s.svc.RemoveUserDN(r.Context(), r.PathValue("id")); err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusNoContent, nil) +} + +func (s *Server) listUserDNs(w http.ResponseWriter, r *http.Request) { + out, err := s.svc.ListUserDNs(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + +func (s *Server) getUserDNByDN(w http.ResponseWriter, r *http.Request) { + dn := r.URL.Query().Get("dn") + d, err := s.svc.GetUserDNByDN(r.Context(), dn) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, d) +} + // LoggingMiddleware logs every request once it completes. func LoggingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/store/external_identity_store.go b/internal/store/external_identity_store.go new file mode 100644 index 000000000..41f9e099c --- /dev/null +++ b/internal/store/external_identity_store.go @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlExternalIdentityStore struct { + db *sqlx.DB +} + +// NewExternalIdentityStore returns a MySQL-backed ExternalIdentityStore. +func NewExternalIdentityStore(db *sqlx.DB) ExternalIdentityStore { + return &mysqlExternalIdentityStore{db: db} +} + +const externalIdentityColumns = `id, user_id, source, external_id, oidc_sub, metadata, created_at` + +func (s *mysqlExternalIdentityStore) FindByID(ctx context.Context, id string) (*models.ExternalIdentity, error) { + var e models.ExternalIdentity + err := s.db.GetContext(ctx, &e, + `SELECT `+externalIdentityColumns+` FROM external_identities WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &e, nil +} + +func (s *mysqlExternalIdentityStore) FindBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.ExternalIdentity, error) { + var e models.ExternalIdentity + err := s.db.GetContext(ctx, &e, + `SELECT `+externalIdentityColumns+` FROM external_identities WHERE source = ? AND external_id = ?`, + source, externalID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &e, nil +} + +func (s *mysqlExternalIdentityStore) FindByOIDCSub(ctx context.Context, oidcSub string) (*models.ExternalIdentity, error) { + var e models.ExternalIdentity + err := s.db.GetContext(ctx, &e, + `SELECT `+externalIdentityColumns+` FROM external_identities WHERE oidc_sub = ?`, oidcSub) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &e, nil +} + +func (s *mysqlExternalIdentityStore) FindByUser(ctx context.Context, userID string) ([]models.ExternalIdentity, error) { + var out []models.ExternalIdentity + err := s.db.SelectContext(ctx, &out, + `SELECT `+externalIdentityColumns+` FROM external_identities WHERE user_id = ? ORDER BY created_at ASC`, + userID) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *mysqlExternalIdentityStore) Create(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO external_identities (id, user_id, source, external_id, oidc_sub, metadata) + VALUES (?, ?, ?, ?, ?, ?)`, + e.ID, e.UserID, e.Source, e.ExternalID, e.OIDCSub, e.Metadata) + return err +} + +func (s *mysqlExternalIdentityStore) Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error { + _, err := tx.ExecContext(ctx, + `UPDATE external_identities + SET user_id = ?, source = ?, external_id = ?, oidc_sub = ?, metadata = ? + WHERE id = ?`, + e.UserID, e.Source, e.ExternalID, e.OIDCSub, e.Metadata, e.ID) + return err +} + +func (s *mysqlExternalIdentityStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, `DELETE FROM external_identities WHERE id = ?`, id) + return err +} diff --git a/internal/store/store.go b/internal/store/store.go index ba6faa9f0..2ba3caced 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -231,6 +231,47 @@ type ComputeAllocationMembershipStore interface { Delete(ctx context.Context, tx *sql.Tx, id string) error } +// ExternalIdentityStore defines persistence operations for the mapping +// between users and their identifiers in external systems. +// UNIQUE (source, external_id) is enforced at the schema level. +type ExternalIdentityStore interface { + // FindByID returns the external identity with the given ID, or nil if absent. + FindByID(ctx context.Context, id string) (*models.ExternalIdentity, error) + // FindBySourceAndExternalID returns the external identity for a (source, + // external_id) pair, or nil if absent. + FindBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.ExternalIdentity, error) + // FindByOIDCSub returns the external identity matching an OIDC subject, or + // nil if absent. oidc_sub is not unique; first match wins. + FindByOIDCSub(ctx context.Context, oidcSub string) (*models.ExternalIdentity, error) + // FindByUser returns every external identity belonging to the given user, + // ordered by created_at ascending. + FindByUser(ctx context.Context, userID string) ([]models.ExternalIdentity, error) + // Create inserts a new external identity within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error + // Update replaces mutable fields of an existing external identity within + // the provided transaction. + Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error + // Delete removes an external identity by ID within the provided transaction. + Delete(ctx context.Context, tx *sql.Tx, id string) error +} + +// UserDNStore defines persistence operations for the X.509 distinguished +// names bound to a user. Append-only: edits are Delete + Create. +type UserDNStore interface { + // FindByID returns the DN binding with the given ID, or nil if absent. + FindByID(ctx context.Context, id string) (*models.UserDN, error) + // FindByDN returns the DN binding matching the given distinguished name, + // or nil if absent. + FindByDN(ctx context.Context, dn string) (*models.UserDN, error) + // FindByUser returns every DN bound to the given user, ordered by + // created_at ascending. + FindByUser(ctx context.Context, userID string) ([]models.UserDN, error) + // Create inserts a new DN binding within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, d *models.UserDN) error + // Delete removes a DN binding by ID within the provided transaction. + Delete(ctx context.Context, tx *sql.Tx, id string) error +} + // ComputeAllocationUsageStore defines persistence operations for the // append-only log of resource consumption events charged against a compute // allocation. diff --git a/internal/store/user_dn_store.go b/internal/store/user_dn_store.go new file mode 100644 index 000000000..656413572 --- /dev/null +++ b/internal/store/user_dn_store.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlUserDNStore struct { + db *sqlx.DB +} + +// NewUserDNStore returns a MySQL-backed UserDNStore. +func NewUserDNStore(db *sqlx.DB) UserDNStore { + return &mysqlUserDNStore{db: db} +} + +const userDNColumns = `id, user_id, dn, created_at` + +func (s *mysqlUserDNStore) FindByID(ctx context.Context, id string) (*models.UserDN, error) { + var d models.UserDN + err := s.db.GetContext(ctx, &d, + `SELECT `+userDNColumns+` FROM user_dns WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &d, nil +} + +func (s *mysqlUserDNStore) FindByDN(ctx context.Context, dn string) (*models.UserDN, error) { + var d models.UserDN + err := s.db.GetContext(ctx, &d, + `SELECT `+userDNColumns+` FROM user_dns WHERE dn = ?`, dn) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &d, nil +} + +func (s *mysqlUserDNStore) FindByUser(ctx context.Context, userID string) ([]models.UserDN, error) { + var out []models.UserDN + err := s.db.SelectContext(ctx, &out, + `SELECT `+userDNColumns+` FROM user_dns WHERE user_id = ? ORDER BY created_at ASC`, + userID) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *mysqlUserDNStore) Create(ctx context.Context, tx *sql.Tx, d *models.UserDN) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO user_dns (id, user_id, dn) VALUES (?, ?, ?)`, + d.ID, d.UserID, d.DN) + return err +} + +func (s *mysqlUserDNStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, `DELETE FROM user_dns WHERE id = ?`, id) + return err +} diff --git a/pkg/events/external_identity_subscribe.go b/pkg/events/external_identity_subscribe.go new file mode 100644 index 000000000..27fd2f498 --- /dev/null +++ b/pkg/events/external_identity_subscribe.go @@ -0,0 +1,46 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ExternalIdentityHandler handles external identity lifecycle events with a typed payload. +type ExternalIdentityHandler func(ext models.ExternalIdentity) + +// SubscribeExternalIdentityCreated registers a typed handler invoked whenever an +// external_identity::create event is published. +func (b *Bus) SubscribeExternalIdentityCreated(handler ExternalIdentityHandler) { + b.subscribeExternalIdentity(ExternalIdentityCreateEvent, handler) +} + +// SubscribeExternalIdentityUpdated registers a typed handler invoked whenever an +// external_identity::update event is published. +func (b *Bus) SubscribeExternalIdentityUpdated(handler ExternalIdentityHandler) { + b.subscribeExternalIdentity(ExternalIdentityUpdateEvent, handler) +} + +// SubscribeExternalIdentityDeleted registers a typed handler invoked whenever an +// external_identity::delete event is published. +func (b *Bus) SubscribeExternalIdentityDeleted(handler ExternalIdentityHandler) { + b.subscribeExternalIdentity(ExternalIdentityDeleteEvent, handler) +} + +func (b *Bus) subscribeExternalIdentity(topic EventType, handler ExternalIdentityHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch e := value.(type) { + case models.ExternalIdentity: + handler(e) + case *models.ExternalIdentity: + if e != nil { + handler(*e) + } + default: + slog.Warn("external identity event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/types.go b/pkg/events/types.go index 11c900e94..0d274b83d 100644 --- a/pkg/events/types.go +++ b/pkg/events/types.go @@ -96,6 +96,19 @@ const ( ComputeAllocationResourceMappingDeleteEvent EventType = "compute_allocation_resource_mapping::delete" ) +// ExternalIdentity lifecycle message types. +const ( + ExternalIdentityCreateEvent EventType = "external_identity::create" + ExternalIdentityUpdateEvent EventType = "external_identity::update" + ExternalIdentityDeleteEvent EventType = "external_identity::delete" +) + +// UserDN lifecycle message types. DNs are append-only credentials; no update topic. +const ( + UserDNCreateEvent EventType = "user_dn::create" + UserDNDeleteEvent EventType = "user_dn::delete" +) + // Event represents a change in the system that downstream consumers may be interested in. // The payload is the full record after the change (e.g. the // new state of a project after an update). diff --git a/pkg/events/user_dn_subscribe.go b/pkg/events/user_dn_subscribe.go new file mode 100644 index 000000000..67b14fd33 --- /dev/null +++ b/pkg/events/user_dn_subscribe.go @@ -0,0 +1,40 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// UserDNHandler handles user DN lifecycle events with a typed payload. +type UserDNHandler func(d models.UserDN) + +// SubscribeUserDNCreated registers a typed handler invoked whenever a +// user_dn::create event is published. +func (b *Bus) SubscribeUserDNCreated(handler UserDNHandler) { + b.subscribeUserDN(UserDNCreateEvent, handler) +} + +// SubscribeUserDNDeleted registers a typed handler invoked whenever a +// user_dn::delete event is published. +func (b *Bus) SubscribeUserDNDeleted(handler UserDNHandler) { + b.subscribeUserDN(UserDNDeleteEvent, handler) +} + +func (b *Bus) subscribeUserDN(topic EventType, handler UserDNHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch d := value.(type) { + case models.UserDN: + handler(d) + case *models.UserDN: + if d != nil { + handler(*d) + } + default: + slog.Warn("user dn event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/models/identity.go b/pkg/models/identity.go new file mode 100644 index 000000000..5220795e0 --- /dev/null +++ b/pkg/models/identity.go @@ -0,0 +1,27 @@ +package models + +import "time" + +// ExternalIdentity links a User to its identifier in an external system +// (ACCESS, NAIRR, CILogon, etc.). One user may have many external identities. +// +// Source-specific attributes (e.g. NSF status code, ACCESS org code) belong in +// Metadata as a JSON-encoded blob. 'core' does not validate its shape. +type ExternalIdentity struct { + ID string `json:"id" db:"id"` + UserID string `json:"user_id" db:"user_id"` + Source string `json:"source" db:"source"` // e.g. access, nairr, cilogon, internal + ExternalID string `json:"external_id" db:"external_id"` // the source's native identifier + OIDCSub string `json:"oidc_sub,omitempty" db:"oidc_sub"` // OIDC subject when the source issues one + Metadata string `json:"metadata,omitempty" db:"metadata"` // JSON-encoded source-specific fields + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// UserDN binds an X.509 distinguished name (e.g. mTLS client cert subject) to a +// User. Append-only: DNs are credentials and are added or removed, never edited. +type UserDN struct { + ID string `json:"id" db:"id"` + UserID string `json:"user_id" db:"user_id"` + DN string `json:"dn" db:"dn"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} diff --git a/pkg/service/external_identity.go b/pkg/service/external_identity.go new file mode 100644 index 000000000..88799b904 --- /dev/null +++ b/pkg/service/external_identity.go @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// CreateExternalIdentity persists a new external identity. If e.ID is empty, a +// new UUID is generated. The referenced user must already exists and the +// (source, external_id) is unique. +func (s *Service) CreateExternalIdentity(ctx context.Context, e *models.ExternalIdentity) (*models.ExternalIdentity, error) { + if e == nil { + return nil, fmt.Errorf("%w: external identity is nil", ErrInvalidInput) + } + if e.UserID == "" { + return nil, fmt.Errorf("%w: external identity user_id is required", ErrInvalidInput) + } + if e.Source == "" { + return nil, fmt.Errorf("%w: external identity source is required", ErrInvalidInput) + } + if e.ExternalID == "" { + return nil, fmt.Errorf("%w: external identity external_id is required", ErrInvalidInput) + } + + if user, err := s.users.FindByID(ctx, e.UserID); err != nil { + return nil, fmt.Errorf("verify user: %w", err) + } else if user == nil { + return nil, fmt.Errorf("%w: user %q does not exist", ErrInvalidInput, e.UserID) + } + + if existing, err := s.extIDs.FindBySourceAndExternalID(ctx, e.Source, e.ExternalID); err != nil { + return nil, fmt.Errorf("lookup external identity: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: external identity for source %q, external_id %q", ErrAlreadyExists, e.Source, e.ExternalID) + } + + if e.ID == "" { + e.ID = newID() + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.extIDs.Create(ctx, tx, e) + }); err != nil { + return nil, fmt.Errorf("create external identity: %w", err) + } + + s.eventBus.Publish(events.ExternalIdentityCreateEvent, e) + return e, nil +} + +// GetExternalIdentity retrieves an external identity by ID. Returns +// ErrNotFound when no row matches. +func (s *Service) GetExternalIdentity(ctx context.Context, id string) (*models.ExternalIdentity, error) { + e, err := s.extIDs.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get external identity: %w", err) + } + if e == nil { + return nil, ErrNotFound + } + return e, nil +} + +// GetExternalIdentityBySourceAndExternalID retrieves the unique external +// identity for the given (source, external_id) pair. +func (s *Service) GetExternalIdentityBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.ExternalIdentity, error) { + e, err := s.extIDs.FindBySourceAndExternalID(ctx, source, externalID) + if err != nil { + return nil, fmt.Errorf("get external identity by source/external_id: %w", err) + } + if e == nil { + return nil, ErrNotFound + } + return e, nil +} + +// GetExternalIdentityByOIDCSub retrieves the first external identity matching +// the given OIDC subject. +func (s *Service) GetExternalIdentityByOIDCSub(ctx context.Context, oidcSub string) (*models.ExternalIdentity, error) { + if oidcSub == "" { + return nil, fmt.Errorf("%w: oidc_sub is required", ErrInvalidInput) + } + e, err := s.extIDs.FindByOIDCSub(ctx, oidcSub) + if err != nil { + return nil, fmt.Errorf("get external identity by oidc_sub: %w", err) + } + if e == nil { + return nil, ErrNotFound + } + return e, nil +} + +// ListExternalIdentitiesForUser returns every external identity belonging to +// the given user. +func (s *Service) ListExternalIdentitiesForUser(ctx context.Context, userID string) ([]models.ExternalIdentity, error) { + out, err := s.extIDs.FindByUser(ctx, userID) + if err != nil { + return nil, fmt.Errorf("list external identities by user: %w", err) + } + return out, nil +} + +// UpdateExternalIdentity persists changes to an existing external identity. +func (s *Service) UpdateExternalIdentity(ctx context.Context, e *models.ExternalIdentity) error { + if e == nil || e.ID == "" { + return fmt.Errorf("%w: external identity id is required", ErrInvalidInput) + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.extIDs.Update(ctx, tx, e) + }); err != nil { + return fmt.Errorf("update external identity: %w", err) + } + + s.eventBus.Publish(events.ExternalIdentityUpdateEvent, e) + return nil +} + +// DeleteExternalIdentity removes an external identity by ID. +func (s *Service) DeleteExternalIdentity(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: external identity id is required", ErrInvalidInput) + } + e, err := s.extIDs.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup external identity: %w", err) + } + if e == nil { + return ErrNotFound + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.extIDs.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("delete external identity: %w", err) + } + + s.eventBus.Publish(events.ExternalIdentityDeleteEvent, e) + return nil +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 935630ac5..aa9538a52 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -49,6 +49,8 @@ type Service struct { changeEvents store.ComputeAllocationChangeRequestEventStore memberships store.ComputeAllocationMembershipStore usages store.ComputeAllocationUsageStore + extIDs store.ExternalIdentityStore + userDNs store.UserDNStore } // New constructs a Service backed by the supplied database handle. @@ -70,6 +72,8 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service { changeEvents: store.NewComputeAllocationChangeRequestEventStore(database), memberships: store.NewComputeAllocationMembershipStore(database), usages: store.NewComputeAllocationUsageStore(database), + extIDs: store.NewExternalIdentityStore(database), + userDNs: store.NewUserDNStore(database), } } @@ -92,6 +96,8 @@ func NewWithStores( changeEvents store.ComputeAllocationChangeRequestEventStore, memberships store.ComputeAllocationMembershipStore, usages store.ComputeAllocationUsageStore, + extIDs store.ExternalIdentityStore, + userDNs store.UserDNStore, ) *Service { return &Service{ db: database, @@ -109,6 +115,8 @@ func NewWithStores( changeEvents: changeEvents, memberships: memberships, usages: usages, + extIDs: extIDs, + userDNs: userDNs, } } diff --git a/pkg/service/user_dn.go b/pkg/service/user_dn.go new file mode 100644 index 000000000..65e07d3cf --- /dev/null +++ b/pkg/service/user_dn.go @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// AddUserDN binds a DN to a user. If d.ID is empty, a new UUID is generated. +// The referenced user must already exist; (user_id, dn) is unique. +func (s *Service) AddUserDN(ctx context.Context, d *models.UserDN) (*models.UserDN, error) { + if d == nil { + return nil, fmt.Errorf("%w: user dn is nil", ErrInvalidInput) + } + if d.UserID == "" { + return nil, fmt.Errorf("%w: user dn user_id is required", ErrInvalidInput) + } + if d.DN == "" { + return nil, fmt.Errorf("%w: user dn dn is required", ErrInvalidInput) + } + + if user, err := s.users.FindByID(ctx, d.UserID); err != nil { + return nil, fmt.Errorf("verify user: %w", err) + } else if user == nil { + return nil, fmt.Errorf("%w: user %q does not exist", ErrInvalidInput, d.UserID) + } + + if existing, err := s.userDNs.FindByDN(ctx, d.DN); err != nil { + return nil, fmt.Errorf("lookup user dn: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: dn %q", ErrAlreadyExists, d.DN) + } + + if d.ID == "" { + d.ID = newID() + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.userDNs.Create(ctx, tx, d) + }); err != nil { + return nil, fmt.Errorf("add user dn: %w", err) + } + + s.eventBus.Publish(events.UserDNCreateEvent, d) + return d, nil +} + +// GetUserDN retrieves a DN binding by ID. Returns ErrNotFound when no row matches. +func (s *Service) GetUserDN(ctx context.Context, id string) (*models.UserDN, error) { + d, err := s.userDNs.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get user dn: %w", err) + } + if d == nil { + return nil, ErrNotFound + } + return d, nil +} + +// GetUserDNByDN performs a reverse lookup from DN to binding. +func (s *Service) GetUserDNByDN(ctx context.Context, dn string) (*models.UserDN, error) { + if dn == "" { + return nil, fmt.Errorf("%w: dn is required", ErrInvalidInput) + } + d, err := s.userDNs.FindByDN(ctx, dn) + if err != nil { + return nil, fmt.Errorf("get user dn by dn: %w", err) + } + if d == nil { + return nil, ErrNotFound + } + return d, nil +} + +// ListUserDNs returns every DN bound to the given user. +func (s *Service) ListUserDNs(ctx context.Context, userID string) ([]models.UserDN, error) { + out, err := s.userDNs.FindByUser(ctx, userID) + if err != nil { + return nil, fmt.Errorf("list user dns: %w", err) + } + return out, nil +} + +// RemoveUserDN removes a DN binding by ID. +func (s *Service) RemoveUserDN(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: user dn id is required", ErrInvalidInput) + } + d, err := s.userDNs.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup user dn: %w", err) + } + if d == nil { + return ErrNotFound + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.userDNs.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("remove user dn: %w", err) + } + + s.eventBus.Publish(events.UserDNDeleteEvent, d) + return nil +}
