This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 231acfeb4c8862b88d7f36e68e5ab3f4ed7740a7
Author: lahiruj <[email protected]>
AuthorDate: Mon May 18 17:08:26 2026 -0400

    Add ExternalIdentity and user_dns to core
---
 ...00010_external_identities_and_user_dns.down.sql |  19 +++
 .../000010_external_identities_and_user_dns.up.sql |  45 ++++++
 internal/server/server.go                          | 137 ++++++++++++++++++
 internal/store/external_identity_store.go          | 112 +++++++++++++++
 internal/store/store.go                            |  41 ++++++
 internal/store/user_dn_store.go                    |  88 ++++++++++++
 pkg/events/external_identity_subscribe.go          |  46 ++++++
 pkg/events/types.go                                |  13 ++
 pkg/events/user_dn_subscribe.go                    |  40 ++++++
 pkg/models/identity.go                             |  27 ++++
 pkg/service/external_identity.go                   | 159 +++++++++++++++++++++
 pkg/service/service.go                             |   8 ++
 pkg/service/user_dn.go                             | 124 ++++++++++++++++
 13 files changed, 859 insertions(+)

diff --git 
a/internal/db/migrations/000010_external_identities_and_user_dns.down.sql 
b/internal/db/migrations/000010_external_identities_and_user_dns.down.sql
new file mode 100644
index 000000000..88dae2ca8
--- /dev/null
+++ b/internal/db/migrations/000010_external_identities_and_user_dns.down.sql
@@ -0,0 +1,19 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS user_dns;
+DROP TABLE IF EXISTS external_identities;
diff --git 
a/internal/db/migrations/000010_external_identities_and_user_dns.up.sql 
b/internal/db/migrations/000010_external_identities_and_user_dns.up.sql
new file mode 100644
index 000000000..375ae7e0e
--- /dev/null
+++ b/internal/db/migrations/000010_external_identities_and_user_dns.up.sql
@@ -0,0 +1,45 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS external_identities
+(
+    id          VARCHAR(255) NOT NULL,
+    user_id     VARCHAR(255) NOT NULL,
+    source      VARCHAR(64)  NOT NULL,
+    external_id VARCHAR(255) NOT NULL,
+    oidc_sub    VARCHAR(255) NOT NULL DEFAULT '',
+    metadata    TEXT         NOT NULL,
+    created_at  TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at  TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE 
CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_external_identities_source_external (source, external_id),
+    KEY idx_external_identities_user (user_id),
+    KEY idx_external_identities_oidc_sub (oidc_sub),
+    CONSTRAINT fk_external_identities_user FOREIGN KEY (user_id) REFERENCES 
users (id) ON DELETE CASCADE
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS user_dns
+(
+    id         VARCHAR(255) NOT NULL,
+    user_id    VARCHAR(255) NOT NULL,
+    dn         VARCHAR(512) NOT NULL,
+    created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_user_dns_user_dn (user_id, dn),
+    KEY idx_user_dns_dn (dn),
+    CONSTRAINT fk_user_dns_user FOREIGN KEY (user_id) REFERENCES users (id) ON 
DELETE CASCADE
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
diff --git a/internal/server/server.go b/internal/server/server.go
index fb98b7320..56b6a6bd7 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -116,6 +116,20 @@ func (s *Server) routes() {
        s.mux.HandleFunc("GET /compute-allocations/{id}/usages/total", 
s.getTotalSUUsageForAllocation)
        s.mux.HandleFunc("GET 
/compute-allocations/{id}/users/{userId}/usages/total", 
s.getTotalSUUsageForUserInAllocation)
        s.mux.HandleFunc("GET /users/{id}/compute-allocation-usages", 
s.listUsagesByUser)
+
+       s.mux.HandleFunc("POST /external-identities", s.createExternalIdentity)
+       s.mux.HandleFunc("GET /external-identities/{id}", s.getExternalIdentity)
+       s.mux.HandleFunc("PUT /external-identities/{id}", 
s.updateExternalIdentity)
+       s.mux.HandleFunc("DELETE /external-identities/{id}", 
s.deleteExternalIdentity)
+       s.mux.HandleFunc("GET 
/external-identities/by-source/{source}/{externalId}", 
s.getExternalIdentityBySource)
+       s.mux.HandleFunc("GET /external-identities/by-oidc-sub/{sub}", 
s.getExternalIdentityByOIDCSub)
+       s.mux.HandleFunc("GET /users/{id}/external-identities", 
s.listExternalIdentitiesForUser)
+
+       s.mux.HandleFunc("POST /users/{id}/dns", s.addUserDN)
+       s.mux.HandleFunc("GET /user-dns/{id}", s.getUserDN)
+       s.mux.HandleFunc("DELETE /user-dns/{id}", s.removeUserDN)
+       s.mux.HandleFunc("GET /users/{id}/dns", s.listUserDNs)
+       s.mux.HandleFunc("GET /user-dns/by-dn", s.getUserDNByDN)
 }
 
 func (s *Server) healthz(w http.ResponseWriter, _ *http.Request) {
@@ -706,6 +720,129 @@ func (s *Server) getTotalSUUsageForUserInAllocation(w 
http.ResponseWriter, r *ht
        })
 }
 
+func (s *Server) createExternalIdentity(w http.ResponseWriter, r 
*http.Request) {
+       var e models.ExternalIdentity
+       if err := decodeJSON(r, &e); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       created, err := s.svc.CreateExternalIdentity(r.Context(), &e)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusCreated, created)
+}
+
+func (s *Server) getExternalIdentity(w http.ResponseWriter, r *http.Request) {
+       e, err := s.svc.GetExternalIdentity(r.Context(), r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, e)
+}
+
+func (s *Server) updateExternalIdentity(w http.ResponseWriter, r 
*http.Request) {
+       var e models.ExternalIdentity
+       if err := decodeJSON(r, &e); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       e.ID = r.PathValue("id")
+       if err := s.svc.UpdateExternalIdentity(r.Context(), &e); err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, &e)
+}
+
+func (s *Server) deleteExternalIdentity(w http.ResponseWriter, r 
*http.Request) {
+       if err := s.svc.DeleteExternalIdentity(r.Context(), r.PathValue("id")); 
err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusNoContent, nil)
+}
+
+func (s *Server) getExternalIdentityBySource(w http.ResponseWriter, r 
*http.Request) {
+       e, err := s.svc.GetExternalIdentityBySourceAndExternalID(r.Context(), 
r.PathValue("source"), r.PathValue("externalId"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, e)
+}
+
+func (s *Server) getExternalIdentityByOIDCSub(w http.ResponseWriter, r 
*http.Request) {
+       e, err := s.svc.GetExternalIdentityByOIDCSub(r.Context(), 
r.PathValue("sub"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, e)
+}
+
+func (s *Server) listExternalIdentitiesForUser(w http.ResponseWriter, r 
*http.Request) {
+       out, err := s.svc.ListExternalIdentitiesForUser(r.Context(), 
r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, out)
+}
+
+func (s *Server) addUserDN(w http.ResponseWriter, r *http.Request) {
+       var d models.UserDN
+       if err := decodeJSON(r, &d); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       d.UserID = r.PathValue("id")
+       created, err := s.svc.AddUserDN(r.Context(), &d)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusCreated, created)
+}
+
+func (s *Server) getUserDN(w http.ResponseWriter, r *http.Request) {
+       d, err := s.svc.GetUserDN(r.Context(), r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, d)
+}
+
+func (s *Server) removeUserDN(w http.ResponseWriter, r *http.Request) {
+       if err := s.svc.RemoveUserDN(r.Context(), r.PathValue("id")); err != 
nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusNoContent, nil)
+}
+
+func (s *Server) listUserDNs(w http.ResponseWriter, r *http.Request) {
+       out, err := s.svc.ListUserDNs(r.Context(), r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, out)
+}
+
+func (s *Server) getUserDNByDN(w http.ResponseWriter, r *http.Request) {
+       dn := r.URL.Query().Get("dn")
+       d, err := s.svc.GetUserDNByDN(r.Context(), dn)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, d)
+}
+
 // LoggingMiddleware logs every request once it completes.
 func LoggingMiddleware(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
diff --git a/internal/store/external_identity_store.go 
b/internal/store/external_identity_store.go
new file mode 100644
index 000000000..41f9e099c
--- /dev/null
+++ b/internal/store/external_identity_store.go
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+type mysqlExternalIdentityStore struct {
+       db *sqlx.DB
+}
+
+// NewExternalIdentityStore returns a MySQL-backed ExternalIdentityStore.
+func NewExternalIdentityStore(db *sqlx.DB) ExternalIdentityStore {
+       return &mysqlExternalIdentityStore{db: db}
+}
+
+const externalIdentityColumns = `id, user_id, source, external_id, oidc_sub, 
metadata, created_at`
+
+func (s *mysqlExternalIdentityStore) FindByID(ctx context.Context, id string) 
(*models.ExternalIdentity, error) {
+       var e models.ExternalIdentity
+       err := s.db.GetContext(ctx, &e,
+               `SELECT `+externalIdentityColumns+` FROM external_identities 
WHERE id = ?`, id)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &e, nil
+}
+
+func (s *mysqlExternalIdentityStore) FindBySourceAndExternalID(ctx 
context.Context, source, externalID string) (*models.ExternalIdentity, error) {
+       var e models.ExternalIdentity
+       err := s.db.GetContext(ctx, &e,
+               `SELECT `+externalIdentityColumns+` FROM external_identities 
WHERE source = ? AND external_id = ?`,
+               source, externalID)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &e, nil
+}
+
+func (s *mysqlExternalIdentityStore) FindByOIDCSub(ctx context.Context, 
oidcSub string) (*models.ExternalIdentity, error) {
+       var e models.ExternalIdentity
+       err := s.db.GetContext(ctx, &e,
+               `SELECT `+externalIdentityColumns+` FROM external_identities 
WHERE oidc_sub = ?`, oidcSub)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &e, nil
+}
+
+func (s *mysqlExternalIdentityStore) FindByUser(ctx context.Context, userID 
string) ([]models.ExternalIdentity, error) {
+       var out []models.ExternalIdentity
+       err := s.db.SelectContext(ctx, &out,
+               `SELECT `+externalIdentityColumns+` FROM external_identities 
WHERE user_id = ? ORDER BY created_at ASC`,
+               userID)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (s *mysqlExternalIdentityStore) Create(ctx context.Context, tx *sql.Tx, e 
*models.ExternalIdentity) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO external_identities (id, user_id, source, 
external_id, oidc_sub, metadata)
+                VALUES (?, ?, ?, ?, ?, ?)`,
+               e.ID, e.UserID, e.Source, e.ExternalID, e.OIDCSub, e.Metadata)
+       return err
+}
+
+func (s *mysqlExternalIdentityStore) Update(ctx context.Context, tx *sql.Tx, e 
*models.ExternalIdentity) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE external_identities
+                   SET user_id = ?, source = ?, external_id = ?, oidc_sub = ?, 
metadata = ?
+                 WHERE id = ?`,
+               e.UserID, e.Source, e.ExternalID, e.OIDCSub, e.Metadata, e.ID)
+       return err
+}
+
+func (s *mysqlExternalIdentityStore) Delete(ctx context.Context, tx *sql.Tx, 
id string) error {
+       _, err := tx.ExecContext(ctx, `DELETE FROM external_identities WHERE id 
= ?`, id)
+       return err
+}
diff --git a/internal/store/store.go b/internal/store/store.go
index ba6faa9f0..2ba3caced 100644
--- a/internal/store/store.go
+++ b/internal/store/store.go
@@ -231,6 +231,47 @@ type ComputeAllocationMembershipStore interface {
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
 
+// ExternalIdentityStore defines persistence operations for the mapping
+// between users and their identifiers in external systems.
+// UNIQUE (source, external_id) is enforced at the schema level.
+type ExternalIdentityStore interface {
+       // FindByID returns the external identity with the given ID, or nil if 
absent.
+       FindByID(ctx context.Context, id string) (*models.ExternalIdentity, 
error)
+       // FindBySourceAndExternalID returns the external identity for a 
(source,
+       // external_id) pair, or nil if absent.
+       FindBySourceAndExternalID(ctx context.Context, source, externalID 
string) (*models.ExternalIdentity, error)
+       // FindByOIDCSub returns the external identity matching an OIDC 
subject, or
+       // nil if absent. oidc_sub is not unique; first match wins.
+       FindByOIDCSub(ctx context.Context, oidcSub string) 
(*models.ExternalIdentity, error)
+       // FindByUser returns every external identity belonging to the given 
user,
+       // ordered by created_at ascending.
+       FindByUser(ctx context.Context, userID string) 
([]models.ExternalIdentity, error)
+       // Create inserts a new external identity within the provided 
transaction.
+       Create(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) 
error
+       // Update replaces mutable fields of an existing external identity 
within
+       // the provided transaction.
+       Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) 
error
+       // Delete removes an external identity by ID within the provided 
transaction.
+       Delete(ctx context.Context, tx *sql.Tx, id string) error
+}
+
+// UserDNStore defines persistence operations for the X.509 distinguished
+// names bound to a user. Append-only: edits are Delete + Create.
+type UserDNStore interface {
+       // FindByID returns the DN binding with the given ID, or nil if absent.
+       FindByID(ctx context.Context, id string) (*models.UserDN, error)
+       // FindByDN returns the DN binding matching the given distinguished 
name,
+       // or nil if absent.
+       FindByDN(ctx context.Context, dn string) (*models.UserDN, error)
+       // FindByUser returns every DN bound to the given user, ordered by
+       // created_at ascending.
+       FindByUser(ctx context.Context, userID string) ([]models.UserDN, error)
+       // Create inserts a new DN binding within the provided transaction.
+       Create(ctx context.Context, tx *sql.Tx, d *models.UserDN) error
+       // Delete removes a DN binding by ID within the provided transaction.
+       Delete(ctx context.Context, tx *sql.Tx, id string) error
+}
+
 // ComputeAllocationUsageStore defines persistence operations for the
 // append-only log of resource consumption events charged against a compute
 // allocation.
diff --git a/internal/store/user_dn_store.go b/internal/store/user_dn_store.go
new file mode 100644
index 000000000..656413572
--- /dev/null
+++ b/internal/store/user_dn_store.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+type mysqlUserDNStore struct {
+       db *sqlx.DB
+}
+
+// NewUserDNStore returns a MySQL-backed UserDNStore.
+func NewUserDNStore(db *sqlx.DB) UserDNStore {
+       return &mysqlUserDNStore{db: db}
+}
+
+const userDNColumns = `id, user_id, dn, created_at`
+
+func (s *mysqlUserDNStore) FindByID(ctx context.Context, id string) 
(*models.UserDN, error) {
+       var d models.UserDN
+       err := s.db.GetContext(ctx, &d,
+               `SELECT `+userDNColumns+` FROM user_dns WHERE id = ?`, id)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &d, nil
+}
+
+func (s *mysqlUserDNStore) FindByDN(ctx context.Context, dn string) 
(*models.UserDN, error) {
+       var d models.UserDN
+       err := s.db.GetContext(ctx, &d,
+               `SELECT `+userDNColumns+` FROM user_dns WHERE dn = ?`, dn)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &d, nil
+}
+
+func (s *mysqlUserDNStore) FindByUser(ctx context.Context, userID string) 
([]models.UserDN, error) {
+       var out []models.UserDN
+       err := s.db.SelectContext(ctx, &out,
+               `SELECT `+userDNColumns+` FROM user_dns WHERE user_id = ? ORDER 
BY created_at ASC`,
+               userID)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (s *mysqlUserDNStore) Create(ctx context.Context, tx *sql.Tx, d 
*models.UserDN) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO user_dns (id, user_id, dn) VALUES (?, ?, ?)`,
+               d.ID, d.UserID, d.DN)
+       return err
+}
+
+func (s *mysqlUserDNStore) Delete(ctx context.Context, tx *sql.Tx, id string) 
error {
+       _, err := tx.ExecContext(ctx, `DELETE FROM user_dns WHERE id = ?`, id)
+       return err
+}
diff --git a/pkg/events/external_identity_subscribe.go 
b/pkg/events/external_identity_subscribe.go
new file mode 100644
index 000000000..27fd2f498
--- /dev/null
+++ b/pkg/events/external_identity_subscribe.go
@@ -0,0 +1,46 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ExternalIdentityHandler handles external identity lifecycle events with a 
typed payload.
+type ExternalIdentityHandler func(ext models.ExternalIdentity)
+
+// SubscribeExternalIdentityCreated registers a typed handler invoked whenever 
an
+// external_identity::create event is published.
+func (b *Bus) SubscribeExternalIdentityCreated(handler 
ExternalIdentityHandler) {
+       b.subscribeExternalIdentity(ExternalIdentityCreateEvent, handler)
+}
+
+// SubscribeExternalIdentityUpdated registers a typed handler invoked whenever 
an
+// external_identity::update event is published.
+func (b *Bus) SubscribeExternalIdentityUpdated(handler 
ExternalIdentityHandler) {
+       b.subscribeExternalIdentity(ExternalIdentityUpdateEvent, handler)
+}
+
+// SubscribeExternalIdentityDeleted registers a typed handler invoked whenever 
an
+// external_identity::delete event is published.
+func (b *Bus) SubscribeExternalIdentityDeleted(handler 
ExternalIdentityHandler) {
+       b.subscribeExternalIdentity(ExternalIdentityDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeExternalIdentity(topic EventType, handler 
ExternalIdentityHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch e := value.(type) {
+               case models.ExternalIdentity:
+                       handler(e)
+               case *models.ExternalIdentity:
+                       if e != nil {
+                               handler(*e)
+                       }
+               default:
+                       slog.Warn("external identity event payload has 
unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/types.go b/pkg/events/types.go
index 11c900e94..0d274b83d 100644
--- a/pkg/events/types.go
+++ b/pkg/events/types.go
@@ -96,6 +96,19 @@ const (
        ComputeAllocationResourceMappingDeleteEvent EventType = 
"compute_allocation_resource_mapping::delete"
 )
 
+// ExternalIdentity lifecycle message types.
+const (
+       ExternalIdentityCreateEvent EventType = "external_identity::create"
+       ExternalIdentityUpdateEvent EventType = "external_identity::update"
+       ExternalIdentityDeleteEvent EventType = "external_identity::delete"
+)
+
+// UserDN lifecycle message types. DNs are append-only credentials; no update 
topic.
+const (
+       UserDNCreateEvent EventType = "user_dn::create"
+       UserDNDeleteEvent EventType = "user_dn::delete"
+)
+
 // Event represents a change in the system that downstream consumers may be 
interested in.
 // The payload is the full record after the change (e.g. the
 // new state of a project after an update).
diff --git a/pkg/events/user_dn_subscribe.go b/pkg/events/user_dn_subscribe.go
new file mode 100644
index 000000000..67b14fd33
--- /dev/null
+++ b/pkg/events/user_dn_subscribe.go
@@ -0,0 +1,40 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// UserDNHandler handles user DN lifecycle events with a typed payload.
+type UserDNHandler func(d models.UserDN)
+
+// SubscribeUserDNCreated registers a typed handler invoked whenever a
+// user_dn::create event is published.
+func (b *Bus) SubscribeUserDNCreated(handler UserDNHandler) {
+       b.subscribeUserDN(UserDNCreateEvent, handler)
+}
+
+// SubscribeUserDNDeleted registers a typed handler invoked whenever a
+// user_dn::delete event is published.
+func (b *Bus) SubscribeUserDNDeleted(handler UserDNHandler) {
+       b.subscribeUserDN(UserDNDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeUserDN(topic EventType, handler UserDNHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch d := value.(type) {
+               case models.UserDN:
+                       handler(d)
+               case *models.UserDN:
+                       if d != nil {
+                               handler(*d)
+                       }
+               default:
+                       slog.Warn("user dn event payload has unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/models/identity.go b/pkg/models/identity.go
new file mode 100644
index 000000000..5220795e0
--- /dev/null
+++ b/pkg/models/identity.go
@@ -0,0 +1,27 @@
+package models
+
+import "time"
+
+// ExternalIdentity links a User to its identifier in an external system
+// (ACCESS, NAIRR, CILogon, etc.). One user may have many external identities.
+//
+// Source-specific attributes (e.g. NSF status code, ACCESS org code) belong in
+// Metadata as a JSON-encoded blob. 'core' does not validate its shape.
+type ExternalIdentity struct {
+       ID         string    `json:"id"                  db:"id"`
+       UserID     string    `json:"user_id"             db:"user_id"`
+       Source     string    `json:"source"              db:"source"`      // 
e.g. access, nairr, cilogon, internal
+       ExternalID string    `json:"external_id"         db:"external_id"` // 
the source's native identifier
+       OIDCSub    string    `json:"oidc_sub,omitempty"  db:"oidc_sub"`    // 
OIDC subject when the source issues one
+       Metadata   string    `json:"metadata,omitempty"  db:"metadata"`    // 
JSON-encoded source-specific fields
+       CreatedAt  time.Time `json:"created_at"          db:"created_at"`
+}
+
+// UserDN binds an X.509 distinguished name (e.g. mTLS client cert subject) to 
a
+// User. Append-only: DNs are credentials and are added or removed, never 
edited.
+type UserDN struct {
+       ID        string    `json:"id"         db:"id"`
+       UserID    string    `json:"user_id"    db:"user_id"`
+       DN        string    `json:"dn"         db:"dn"`
+       CreatedAt time.Time `json:"created_at" db:"created_at"`
+}
diff --git a/pkg/service/external_identity.go b/pkg/service/external_identity.go
new file mode 100644
index 000000000..88799b904
--- /dev/null
+++ b/pkg/service/external_identity.go
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// CreateExternalIdentity persists a new external identity. If e.ID is empty, a
+// new UUID is generated. The referenced user must already exists and the
+// (source, external_id) is unique.
+func (s *Service) CreateExternalIdentity(ctx context.Context, e 
*models.ExternalIdentity) (*models.ExternalIdentity, error) {
+       if e == nil {
+               return nil, fmt.Errorf("%w: external identity is nil", 
ErrInvalidInput)
+       }
+       if e.UserID == "" {
+               return nil, fmt.Errorf("%w: external identity user_id is 
required", ErrInvalidInput)
+       }
+       if e.Source == "" {
+               return nil, fmt.Errorf("%w: external identity source is 
required", ErrInvalidInput)
+       }
+       if e.ExternalID == "" {
+               return nil, fmt.Errorf("%w: external identity external_id is 
required", ErrInvalidInput)
+       }
+
+       if user, err := s.users.FindByID(ctx, e.UserID); err != nil {
+               return nil, fmt.Errorf("verify user: %w", err)
+       } else if user == nil {
+               return nil, fmt.Errorf("%w: user %q does not exist", 
ErrInvalidInput, e.UserID)
+       }
+
+       if existing, err := s.extIDs.FindBySourceAndExternalID(ctx, e.Source, 
e.ExternalID); err != nil {
+               return nil, fmt.Errorf("lookup external identity: %w", err)
+       } else if existing != nil {
+               return nil, fmt.Errorf("%w: external identity for source %q, 
external_id %q", ErrAlreadyExists, e.Source, e.ExternalID)
+       }
+
+       if e.ID == "" {
+               e.ID = newID()
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.extIDs.Create(ctx, tx, e)
+       }); err != nil {
+               return nil, fmt.Errorf("create external identity: %w", err)
+       }
+
+       s.eventBus.Publish(events.ExternalIdentityCreateEvent, e)
+       return e, nil
+}
+
+// GetExternalIdentity retrieves an external identity by ID. Returns
+// ErrNotFound when no row matches.
+func (s *Service) GetExternalIdentity(ctx context.Context, id string) 
(*models.ExternalIdentity, error) {
+       e, err := s.extIDs.FindByID(ctx, id)
+       if err != nil {
+               return nil, fmt.Errorf("get external identity: %w", err)
+       }
+       if e == nil {
+               return nil, ErrNotFound
+       }
+       return e, nil
+}
+
+// GetExternalIdentityBySourceAndExternalID retrieves the unique external
+// identity for the given (source, external_id) pair.
+func (s *Service) GetExternalIdentityBySourceAndExternalID(ctx 
context.Context, source, externalID string) (*models.ExternalIdentity, error) {
+       e, err := s.extIDs.FindBySourceAndExternalID(ctx, source, externalID)
+       if err != nil {
+               return nil, fmt.Errorf("get external identity by 
source/external_id: %w", err)
+       }
+       if e == nil {
+               return nil, ErrNotFound
+       }
+       return e, nil
+}
+
+// GetExternalIdentityByOIDCSub retrieves the first external identity matching
+// the given OIDC subject.
+func (s *Service) GetExternalIdentityByOIDCSub(ctx context.Context, oidcSub 
string) (*models.ExternalIdentity, error) {
+       if oidcSub == "" {
+               return nil, fmt.Errorf("%w: oidc_sub is required", 
ErrInvalidInput)
+       }
+       e, err := s.extIDs.FindByOIDCSub(ctx, oidcSub)
+       if err != nil {
+               return nil, fmt.Errorf("get external identity by oidc_sub: %w", 
err)
+       }
+       if e == nil {
+               return nil, ErrNotFound
+       }
+       return e, nil
+}
+
+// ListExternalIdentitiesForUser returns every external identity belonging to
+// the given user.
+func (s *Service) ListExternalIdentitiesForUser(ctx context.Context, userID 
string) ([]models.ExternalIdentity, error) {
+       out, err := s.extIDs.FindByUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("list external identities by user: %w", 
err)
+       }
+       return out, nil
+}
+
+// UpdateExternalIdentity persists changes to an existing external identity.
+func (s *Service) UpdateExternalIdentity(ctx context.Context, e 
*models.ExternalIdentity) error {
+       if e == nil || e.ID == "" {
+               return fmt.Errorf("%w: external identity id is required", 
ErrInvalidInput)
+       }
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.extIDs.Update(ctx, tx, e)
+       }); err != nil {
+               return fmt.Errorf("update external identity: %w", err)
+       }
+
+       s.eventBus.Publish(events.ExternalIdentityUpdateEvent, e)
+       return nil
+}
+
+// DeleteExternalIdentity removes an external identity by ID.
+func (s *Service) DeleteExternalIdentity(ctx context.Context, id string) error 
{
+       if id == "" {
+               return fmt.Errorf("%w: external identity id is required", 
ErrInvalidInput)
+       }
+       e, err := s.extIDs.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup external identity: %w", err)
+       }
+       if e == nil {
+               return ErrNotFound
+       }
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.extIDs.Delete(ctx, tx, id)
+       }); err != nil {
+               return fmt.Errorf("delete external identity: %w", err)
+       }
+
+       s.eventBus.Publish(events.ExternalIdentityDeleteEvent, e)
+       return nil
+}
diff --git a/pkg/service/service.go b/pkg/service/service.go
index 935630ac5..aa9538a52 100644
--- a/pkg/service/service.go
+++ b/pkg/service/service.go
@@ -49,6 +49,8 @@ type Service struct {
        changeEvents     store.ComputeAllocationChangeRequestEventStore
        memberships      store.ComputeAllocationMembershipStore
        usages           store.ComputeAllocationUsageStore
+       extIDs           store.ExternalIdentityStore
+       userDNs          store.UserDNStore
 }
 
 // New constructs a Service backed by the supplied database handle.
@@ -70,6 +72,8 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service {
                changeEvents:     
store.NewComputeAllocationChangeRequestEventStore(database),
                memberships:      
store.NewComputeAllocationMembershipStore(database),
                usages:           
store.NewComputeAllocationUsageStore(database),
+               extIDs:           store.NewExternalIdentityStore(database),
+               userDNs:          store.NewUserDNStore(database),
        }
 }
 
@@ -92,6 +96,8 @@ func NewWithStores(
        changeEvents store.ComputeAllocationChangeRequestEventStore,
        memberships store.ComputeAllocationMembershipStore,
        usages store.ComputeAllocationUsageStore,
+       extIDs store.ExternalIdentityStore,
+       userDNs store.UserDNStore,
 ) *Service {
        return &Service{
                db:               database,
@@ -109,6 +115,8 @@ func NewWithStores(
                changeEvents:     changeEvents,
                memberships:      memberships,
                usages:           usages,
+               extIDs:           extIDs,
+               userDNs:          userDNs,
        }
 }
 
diff --git a/pkg/service/user_dn.go b/pkg/service/user_dn.go
new file mode 100644
index 000000000..65e07d3cf
--- /dev/null
+++ b/pkg/service/user_dn.go
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// AddUserDN binds a DN to a user. If d.ID is empty, a new UUID is generated.
+// The referenced user must already exist; (user_id, dn) is unique.
+func (s *Service) AddUserDN(ctx context.Context, d *models.UserDN) 
(*models.UserDN, error) {
+       if d == nil {
+               return nil, fmt.Errorf("%w: user dn is nil", ErrInvalidInput)
+       }
+       if d.UserID == "" {
+               return nil, fmt.Errorf("%w: user dn user_id is required", 
ErrInvalidInput)
+       }
+       if d.DN == "" {
+               return nil, fmt.Errorf("%w: user dn dn is required", 
ErrInvalidInput)
+       }
+
+       if user, err := s.users.FindByID(ctx, d.UserID); err != nil {
+               return nil, fmt.Errorf("verify user: %w", err)
+       } else if user == nil {
+               return nil, fmt.Errorf("%w: user %q does not exist", 
ErrInvalidInput, d.UserID)
+       }
+
+       if existing, err := s.userDNs.FindByDN(ctx, d.DN); err != nil {
+               return nil, fmt.Errorf("lookup user dn: %w", err)
+       } else if existing != nil {
+               return nil, fmt.Errorf("%w: dn %q", ErrAlreadyExists, d.DN)
+       }
+
+       if d.ID == "" {
+               d.ID = newID()
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.userDNs.Create(ctx, tx, d)
+       }); err != nil {
+               return nil, fmt.Errorf("add user dn: %w", err)
+       }
+
+       s.eventBus.Publish(events.UserDNCreateEvent, d)
+       return d, nil
+}
+
+// GetUserDN retrieves a DN binding by ID. Returns ErrNotFound when no row 
matches.
+func (s *Service) GetUserDN(ctx context.Context, id string) (*models.UserDN, 
error) {
+       d, err := s.userDNs.FindByID(ctx, id)
+       if err != nil {
+               return nil, fmt.Errorf("get user dn: %w", err)
+       }
+       if d == nil {
+               return nil, ErrNotFound
+       }
+       return d, nil
+}
+
+// GetUserDNByDN performs a reverse lookup from DN to binding.
+func (s *Service) GetUserDNByDN(ctx context.Context, dn string) 
(*models.UserDN, error) {
+       if dn == "" {
+               return nil, fmt.Errorf("%w: dn is required", ErrInvalidInput)
+       }
+       d, err := s.userDNs.FindByDN(ctx, dn)
+       if err != nil {
+               return nil, fmt.Errorf("get user dn by dn: %w", err)
+       }
+       if d == nil {
+               return nil, ErrNotFound
+       }
+       return d, nil
+}
+
+// ListUserDNs returns every DN bound to the given user.
+func (s *Service) ListUserDNs(ctx context.Context, userID string) 
([]models.UserDN, error) {
+       out, err := s.userDNs.FindByUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("list user dns: %w", err)
+       }
+       return out, nil
+}
+
+// RemoveUserDN removes a DN binding by ID.
+func (s *Service) RemoveUserDN(ctx context.Context, id string) error {
+       if id == "" {
+               return fmt.Errorf("%w: user dn id is required", ErrInvalidInput)
+       }
+       d, err := s.userDNs.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup user dn: %w", err)
+       }
+       if d == nil {
+               return ErrNotFound
+       }
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.userDNs.Delete(ctx, tx, id)
+       }); err != nil {
+               return fmt.Errorf("remove user dn: %w", err)
+       }
+
+       s.eventBus.Publish(events.UserDNDeleteEvent, d)
+       return nil
+}


Reply via email to