This is an automated email from the ASF dual-hosted git repository.

DImuthuUpe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit ceb9d4d9c72ccce421d3e67f541375218ae584c4
Author: DImuthuUpe <[email protected]>
AuthorDate: Sun May 17 20:23:28 2026 -0400

    Added subscription layer for connectors to receive allocation messages
---
 cmd/server/main.go                                 |   6 +-
 pkg/events/bus.go                                  |  44 ++++++++
 pkg/events/compute_allocation_diff_subscribe.go    |  48 +++++++++
 .../compute_allocation_membership_subscribe.go     |  48 +++++++++
 ...ompute_allocation_resource_mapping_subscribe.go |  48 +++++++++
 .../compute_allocation_resource_subscribe.go       |  48 +++++++++
 pkg/events/compute_allocation_subscribe.go         |  48 +++++++++
 pkg/events/organization_subscribe.go               |  47 +++++++++
 pkg/events/project_subscribe.go                    |  49 +++++++++
 pkg/events/types.go                                | 115 +++++++++++++++++++++
 pkg/events/user_subscribe.go                       |  47 +++++++++
 pkg/service/compute_allocation.go                  |  14 +++
 pkg/service/compute_allocation_diff.go             |  12 +++
 pkg/service/compute_allocation_membership.go       |  11 ++
 pkg/service/compute_allocation_resource.go         |  14 +++
 pkg/service/compute_allocation_resource_mapping.go |   5 +
 pkg/service/compute_cluster.go                     |  14 +++
 pkg/service/organization.go                        |  14 +++
 pkg/service/project.go                             |  14 +++
 pkg/service/service.go                             |   7 +-
 20 files changed, 651 insertions(+), 2 deletions(-)

diff --git a/cmd/server/main.go b/cmd/server/main.go
index 08f9dd748..978d5cc92 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -31,6 +31,7 @@ import (
 
        "github.com/apache/airavata-custos/internal/db"
        "github.com/apache/airavata-custos/internal/server"
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
 
@@ -68,7 +69,10 @@ func run() error {
                return err
        }
 
-       svc := service.New(database)
+       // Create a new event bus instance to async messaging between service 
and connectors
+       eventBus := events.New()
+
+       svc := service.New(database, eventBus)
        handler := server.LoggingMiddleware(server.New(svc))
 
        httpServer := &http.Server{
diff --git a/pkg/events/bus.go b/pkg/events/bus.go
new file mode 100644
index 000000000..e2e419b25
--- /dev/null
+++ b/pkg/events/bus.go
@@ -0,0 +1,44 @@
+package events
+
+func New() *Bus {
+       return &Bus{
+               subs: make(map[string][]EventSubscriberFunc),
+       }
+}
+
+// Subscribe registers a handler for a given topic.
+// The handler is called asynchronously (in a new goroutine) each time
+// an event is published on that topic.
+func (b *Bus) Subscribe(topic EventType, handler EventSubscriberFunc) {
+       b.mu.Lock()
+       defer b.mu.Unlock()
+       b.subs[string(topic)] = append(b.subs[string(topic)], handler)
+}
+
+// Publish sends an event to all subscribers of the given topic.
+// Each handler runs in its own goroutine so publishers never block.
+func (b *Bus) Publish(topic EventType, payload any) {
+       b.mu.RLock()
+       handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)]))
+       copy(handlers, b.subs[string(topic)])
+       b.mu.RUnlock()
+
+       event := Event{Type: topic, Payload: payload}
+       for _, h := range handlers {
+               go h(event, payload)
+       }
+}
+
+// PublishSync is like Publish but calls handlers in the caller's goroutine.
+// Useful when you need to guarantee ordering or want backpressure.
+func (b *Bus) PublishSync(topic EventType, payload any) {
+       b.mu.RLock()
+       handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)]))
+       copy(handlers, b.subs[string(topic)])
+       b.mu.RUnlock()
+
+       event := Event{Type: topic, Payload: payload}
+       for _, h := range handlers {
+               h(event, payload)
+       }
+}
diff --git a/pkg/events/compute_allocation_diff_subscribe.go 
b/pkg/events/compute_allocation_diff_subscribe.go
new file mode 100644
index 000000000..126aa2b4c
--- /dev/null
+++ b/pkg/events/compute_allocation_diff_subscribe.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ComputeAllocationDiffHandler handles compute allocation diff lifecycle 
events with a typed payload.
+type ComputeAllocationDiffHandler func(diff models.ComputeAllocationDiff)
+
+// SubscribeComputeAllocationDiffCreated registers a typed handler invoked 
whenever a
+// compute_allocation_diff::create event is published. Events with payloads 
that are
+// not a models.ComputeAllocationDiff (or *models.ComputeAllocationDiff) are 
dropped
+// with a warning log.
+func (b *Bus) SubscribeComputeAllocationDiffCreated(handler 
ComputeAllocationDiffHandler) {
+       b.subscribeComputeAllocationDiff(ComputeAllocationDiffCreateEvent, 
handler)
+}
+
+// SubscribeComputeAllocationDiffUpdated registers a typed handler invoked 
whenever a
+// compute_allocation_diff::update event is published.
+func (b *Bus) SubscribeComputeAllocationDiffUpdated(handler 
ComputeAllocationDiffHandler) {
+       b.subscribeComputeAllocationDiff(ComputeAllocationDiffUpdateEvent, 
handler)
+}
+
+// SubscribeComputeAllocationDiffDeleted registers a typed handler invoked 
whenever a
+// compute_allocation_diff::delete event is published.
+func (b *Bus) SubscribeComputeAllocationDiffDeleted(handler 
ComputeAllocationDiffHandler) {
+       b.subscribeComputeAllocationDiff(ComputeAllocationDiffDeleteEvent, 
handler)
+}
+
+func (b *Bus) subscribeComputeAllocationDiff(topic EventType, handler 
ComputeAllocationDiffHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch d := value.(type) {
+               case models.ComputeAllocationDiff:
+                       handler(d)
+               case *models.ComputeAllocationDiff:
+                       if d != nil {
+                               handler(*d)
+                       }
+               default:
+                       slog.Warn("compute allocation diff event payload has 
unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/compute_allocation_membership_subscribe.go 
b/pkg/events/compute_allocation_membership_subscribe.go
new file mode 100644
index 000000000..79c3f6cf0
--- /dev/null
+++ b/pkg/events/compute_allocation_membership_subscribe.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ComputeAllocationMembershipHandler handles compute allocation membership 
lifecycle events with a typed payload.
+type ComputeAllocationMembershipHandler func(membership 
models.ComputeAllocationMembership)
+
+// SubscribeComputeAllocationMembershipCreated registers a typed handler 
invoked whenever a
+// compute_allocation_membership::create event is published. Events with 
payloads that are
+// not a models.ComputeAllocationMembership (or 
*models.ComputeAllocationMembership) are
+// dropped with a warning log.
+func (b *Bus) SubscribeComputeAllocationMembershipCreated(handler 
ComputeAllocationMembershipHandler) {
+       
b.subscribeComputeAllocationMembership(ComputeAllocationMembershipCreateEvent, 
handler)
+}
+
+// SubscribeComputeAllocationMembershipUpdated registers a typed handler 
invoked whenever a
+// compute_allocation_membership::update event is published.
+func (b *Bus) SubscribeComputeAllocationMembershipUpdated(handler 
ComputeAllocationMembershipHandler) {
+       
b.subscribeComputeAllocationMembership(ComputeAllocationMembershipUpdateEvent, 
handler)
+}
+
+// SubscribeComputeAllocationMembershipDeleted registers a typed handler 
invoked whenever a
+// compute_allocation_membership::delete event is published.
+func (b *Bus) SubscribeComputeAllocationMembershipDeleted(handler 
ComputeAllocationMembershipHandler) {
+       
b.subscribeComputeAllocationMembership(ComputeAllocationMembershipDeleteEvent, 
handler)
+}
+
+func (b *Bus) subscribeComputeAllocationMembership(topic EventType, handler 
ComputeAllocationMembershipHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch m := value.(type) {
+               case models.ComputeAllocationMembership:
+                       handler(m)
+               case *models.ComputeAllocationMembership:
+                       if m != nil {
+                               handler(*m)
+                       }
+               default:
+                       slog.Warn("compute allocation membership event payload 
has unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/compute_allocation_resource_mapping_subscribe.go 
b/pkg/events/compute_allocation_resource_mapping_subscribe.go
new file mode 100644
index 000000000..25522204a
--- /dev/null
+++ b/pkg/events/compute_allocation_resource_mapping_subscribe.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ComputeAllocationResourceMappingHandler handles compute allocation resource 
mapping lifecycle events with a typed payload.
+type ComputeAllocationResourceMappingHandler func(mapping 
models.ComputeAllocationResourceMapping)
+
+// SubscribeComputeAllocationResourceMappingCreated registers a typed handler 
invoked whenever a
+// compute_allocation_resource_mapping::create event is published. Events with 
payloads that are
+// not a models.ComputeAllocationResourceMapping (or 
*models.ComputeAllocationResourceMapping) are
+// dropped with a warning log.
+func (b *Bus) SubscribeComputeAllocationResourceMappingCreated(handler 
ComputeAllocationResourceMappingHandler) {
+       
b.subscribeComputeAllocationResourceMapping(ComputeAllocationResourceMappingCreateEvent,
 handler)
+}
+
+// SubscribeComputeAllocationResourceMappingUpdated registers a typed handler 
invoked whenever a
+// compute_allocation_resource_mapping::update event is published.
+func (b *Bus) SubscribeComputeAllocationResourceMappingUpdated(handler 
ComputeAllocationResourceMappingHandler) {
+       
b.subscribeComputeAllocationResourceMapping(ComputeAllocationResourceMappingUpdateEvent,
 handler)
+}
+
+// SubscribeComputeAllocationResourceMappingDeleted registers a typed handler 
invoked whenever a
+// compute_allocation_resource_mapping::delete event is published.
+func (b *Bus) SubscribeComputeAllocationResourceMappingDeleted(handler 
ComputeAllocationResourceMappingHandler) {
+       
b.subscribeComputeAllocationResourceMapping(ComputeAllocationResourceMappingDeleteEvent,
 handler)
+}
+
+func (b *Bus) subscribeComputeAllocationResourceMapping(topic EventType, 
handler ComputeAllocationResourceMappingHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch m := value.(type) {
+               case models.ComputeAllocationResourceMapping:
+                       handler(m)
+               case *models.ComputeAllocationResourceMapping:
+                       if m != nil {
+                               handler(*m)
+                       }
+               default:
+                       slog.Warn("compute allocation resource mapping event 
payload has unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/compute_allocation_resource_subscribe.go 
b/pkg/events/compute_allocation_resource_subscribe.go
new file mode 100644
index 000000000..bca3308b5
--- /dev/null
+++ b/pkg/events/compute_allocation_resource_subscribe.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ComputeAllocationResourceHandler handles compute allocation resource 
lifecycle events with a typed payload.
+type ComputeAllocationResourceHandler func(resource 
models.ComputeAllocationResource)
+
+// SubscribeComputeAllocationResourceCreated registers a typed handler invoked 
whenever a
+// compute_allocation_resource::create event is published. Events with 
payloads that are
+// not a models.ComputeAllocationResource (or 
*models.ComputeAllocationResource) are dropped
+// with a warning log.
+func (b *Bus) SubscribeComputeAllocationResourceCreated(handler 
ComputeAllocationResourceHandler) {
+       
b.subscribeComputeAllocationResource(ComputeAllocationResourceCreateEvent, 
handler)
+}
+
+// SubscribeComputeAllocationResourceUpdated registers a typed handler invoked 
whenever a
+// compute_allocation_resource::update event is published.
+func (b *Bus) SubscribeComputeAllocationResourceUpdated(handler 
ComputeAllocationResourceHandler) {
+       
b.subscribeComputeAllocationResource(ComputeAllocationResourceUpdateEvent, 
handler)
+}
+
+// SubscribeComputeAllocationResourceDeleted registers a typed handler invoked 
whenever a
+// compute_allocation_resource::delete event is published.
+func (b *Bus) SubscribeComputeAllocationResourceDeleted(handler 
ComputeAllocationResourceHandler) {
+       
b.subscribeComputeAllocationResource(ComputeAllocationResourceDeleteEvent, 
handler)
+}
+
+func (b *Bus) subscribeComputeAllocationResource(topic EventType, handler 
ComputeAllocationResourceHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch r := value.(type) {
+               case models.ComputeAllocationResource:
+                       handler(r)
+               case *models.ComputeAllocationResource:
+                       if r != nil {
+                               handler(*r)
+                       }
+               default:
+                       slog.Warn("compute allocation resource event payload 
has unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/compute_allocation_subscribe.go 
b/pkg/events/compute_allocation_subscribe.go
new file mode 100644
index 000000000..93fc2e3c8
--- /dev/null
+++ b/pkg/events/compute_allocation_subscribe.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ComputeAllocationHandler handles compute allocation lifecycle events with a 
typed payload.
+type ComputeAllocationHandler func(allocation models.ComputeAllocation)
+
+// SubscribeComputeAllocationCreated registers a typed handler invoked 
whenever a
+// compute_allocation::create event is published. Events with payloads that are
+// not a models.ComputeAllocation (or *models.ComputeAllocation) are dropped
+// with a warning log.
+func (b *Bus) SubscribeComputeAllocationCreated(handler 
ComputeAllocationHandler) {
+       b.subscribeComputeAllocation(ComputeAllocationCreateEvent, handler)
+}
+
+// SubscribeComputeAllocationUpdated registers a typed handler invoked 
whenever a
+// compute_allocation::update event is published.
+func (b *Bus) SubscribeComputeAllocationUpdated(handler 
ComputeAllocationHandler) {
+       b.subscribeComputeAllocation(ComputeAllocationUpdateEvent, handler)
+}
+
+// SubscribeComputeAllocationDeleted registers a typed handler invoked 
whenever a
+// compute_allocation::delete event is published.
+func (b *Bus) SubscribeComputeAllocationDeleted(handler 
ComputeAllocationHandler) {
+       b.subscribeComputeAllocation(ComputeAllocationDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeComputeAllocation(topic EventType, handler 
ComputeAllocationHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch a := value.(type) {
+               case models.ComputeAllocation:
+                       handler(a)
+               case *models.ComputeAllocation:
+                       if a != nil {
+                               handler(*a)
+                       }
+               default:
+                       slog.Warn("compute allocation event payload has 
unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/organization_subscribe.go 
b/pkg/events/organization_subscribe.go
new file mode 100644
index 000000000..68a6b528e
--- /dev/null
+++ b/pkg/events/organization_subscribe.go
@@ -0,0 +1,47 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// OrganizationHandler handles organization lifecycle events with a typed 
payload.
+type OrganizationHandler func(organization models.Organization)
+
+// SubscribeOrganizationCreated registers a typed handler invoked whenever an
+// organization::create event is published. Events with payloads that are not a
+// models.Organization (or *models.Organization) are dropped with a warning 
log.
+func (b *Bus) SubscribeOrganizationCreated(handler OrganizationHandler) {
+       b.subscribeOrganization(OrganizationCreateEvent, handler)
+}
+
+// SubscribeOrganizationUpdated registers a typed handler invoked whenever an
+// organization::update event is published.
+func (b *Bus) SubscribeOrganizationUpdated(handler OrganizationHandler) {
+       b.subscribeOrganization(OrganizationUpdateEvent, handler)
+}
+
+// SubscribeOrganizationDeleted registers a typed handler invoked whenever an
+// organization::delete event is published.
+func (b *Bus) SubscribeOrganizationDeleted(handler OrganizationHandler) {
+       b.subscribeOrganization(OrganizationDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeOrganization(topic EventType, handler 
OrganizationHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch o := value.(type) {
+               case models.Organization:
+                       handler(o)
+               case *models.Organization:
+                       if o != nil {
+                               handler(*o)
+                       }
+               default:
+                       slog.Warn("organization event payload has unexpected 
type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/project_subscribe.go b/pkg/events/project_subscribe.go
new file mode 100644
index 000000000..da7cb7819
--- /dev/null
+++ b/pkg/events/project_subscribe.go
@@ -0,0 +1,49 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ProjectHandler handles project lifecycle events with a typed payload.
+type ProjectHandler func(project models.Project)
+
+// SubscribeProjectCreated registers a typed handler invoked whenever a
+// project::create event is published. Events with payloads that are not a
+// models.Project (or *models.Project) are dropped with a warning log.
+func (b *Bus) SubscribeProjectCreated(handler ProjectHandler) {
+       b.subscribeProject(ProjectCreateEvent, handler)
+}
+
+// SubscribeProjectUpdated registers a typed handler invoked whenever a
+// project::update event is published. Events with payloads that are not a
+// models.Project (or *models.Project) are dropped with a warning log.
+func (b *Bus) SubscribeProjectUpdated(handler ProjectHandler) {
+       b.subscribeProject(ProjectUpdateEvent, handler)
+}
+
+// SubscribeProjectDeleted registers a typed handler invoked whenever a
+// project::delete event is published. Events with payloads that are not a
+// models.Project (or *models.Project) are dropped with a warning log.
+func (b *Bus) SubscribeProjectDeleted(handler ProjectHandler) {
+       b.subscribeProject(ProjectDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeProject(topic EventType, handler ProjectHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch p := value.(type) {
+               case models.Project:
+                       handler(p)
+               case *models.Project:
+                       if p != nil {
+                               handler(*p)
+                       }
+               default:
+                       slog.Warn("project event payload has unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/types.go b/pkg/events/types.go
new file mode 100644
index 000000000..11c900e94
--- /dev/null
+++ b/pkg/events/types.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package events defines message types emitted by the service for
+// downstream consumers (audit, projections, integrations).
+package events
+
+import (
+       "sync"
+)
+
+// EventType identifies the kind of event carried on the bus.
+type EventType string
+
+// Project lifecycle message types.
+const (
+       ProjectCreateEvent EventType = "project::create"
+       ProjectUpdateEvent EventType = "project::update"
+       ProjectDeleteEvent EventType = "project::delete"
+)
+
+// User lifecycle message types.
+const (
+       UserCreateEvent EventType = "user::create"
+       UserUpdateEvent EventType = "user::update"
+       UserDeleteEvent EventType = "user::delete"
+)
+
+// Organization lifecycle message types.
+const (
+       OrganizationCreateEvent EventType = "organization::create"
+       OrganizationUpdateEvent EventType = "organization::update"
+       OrganizationDeleteEvent EventType = "organization::delete"
+)
+
+// ComputeCluster lifecycle message types.
+const (
+       ComputeClusterCreateEvent EventType = "compute_cluster::create"
+       ComputeClusterUpdateEvent EventType = "compute_cluster::update"
+       ComputeClusterDeleteEvent EventType = "compute_cluster::delete"
+)
+
+// ClusterAccount lifecycle message types.
+const (
+       ClusterAccountCreateEvent EventType = "cluster_account::create"
+       ClusterAccountUpdateEvent EventType = "cluster_account::update"
+       ClusterAccountDeleteEvent EventType = "cluster_account::delete"
+)
+
+// ComputeAllocation lifecycle message types.
+const (
+       ComputeAllocationCreateEvent EventType = "compute_allocation::create"
+       ComputeAllocationUpdateEvent EventType = "compute_allocation::update"
+       ComputeAllocationDeleteEvent EventType = "compute_allocation::delete"
+)
+
+// CreateComputeAllocationDiff lifecycle message types.
+const (
+       ComputeAllocationDiffCreateEvent EventType = 
"compute_allocation_diff::create"
+       ComputeAllocationDiffUpdateEvent EventType = 
"compute_allocation_diff::update"
+       ComputeAllocationDiffDeleteEvent EventType = 
"compute_allocation_diff::delete"
+)
+
+// ComputeAllocationResource lifecycle message types.
+const (
+       ComputeAllocationResourceCreateEvent EventType = 
"compute_allocation_resource::create"
+       ComputeAllocationResourceUpdateEvent EventType = 
"compute_allocation_resource::update"
+       ComputeAllocationResourceDeleteEvent EventType = 
"compute_allocation_resource::delete"
+)
+
+// ComputeAllocationMembership lifecycle message types.
+const (
+       ComputeAllocationMembershipCreateEvent EventType = 
"compute_allocation_membership::create"
+       ComputeAllocationMembershipUpdateEvent EventType = 
"compute_allocation_membership::update"
+       ComputeAllocationMembershipDeleteEvent EventType = 
"compute_allocation_membership::delete"
+)
+
+// ComputeAllocationResourceMapping lifecycle message types.
+const (
+       ComputeAllocationResourceMappingCreateEvent EventType = 
"compute_allocation_resource_mapping::create"
+       ComputeAllocationResourceMappingUpdateEvent EventType = 
"compute_allocation_resource_mapping::update"
+       ComputeAllocationResourceMappingDeleteEvent EventType = 
"compute_allocation_resource_mapping::delete"
+)
+
+// Event represents a change in the system that downstream consumers may be 
interested in.
+// The payload is the full record after the change (e.g. the
+// new state of a project after an update).
+type Event struct {
+       Type    EventType   `json:"type"`
+       Payload interface{} `json:"payload"`
+}
+
+// EventSubscriberFunc is a function type that can be registered to receive 
events from the bus.
+type EventSubscriberFunc func(event Event, value interface{})
+
+// Bus is a lightweight, in-memory, topic-based pub/sub event bus.
+// Modules publish and subscribe by topic without knowing about each other.
+type Bus struct {
+       mu   sync.RWMutex
+       subs map[string][]EventSubscriberFunc
+}
diff --git a/pkg/events/user_subscribe.go b/pkg/events/user_subscribe.go
new file mode 100644
index 000000000..7c08a49c5
--- /dev/null
+++ b/pkg/events/user_subscribe.go
@@ -0,0 +1,47 @@
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// UserHandler handles user lifecycle events with a typed payload.
+type UserHandler func(user models.User)
+
+// SubscribeUserCreated registers a typed handler invoked whenever a
+// user::create event is published. Events with payloads that are not a
+// models.User (or *models.User) are dropped with a warning log.
+func (b *Bus) SubscribeUserCreated(handler UserHandler) {
+       b.subscribeUser(UserCreateEvent, handler)
+}
+
+// SubscribeUserUpdated registers a typed handler invoked whenever a
+// user::update event is published.
+func (b *Bus) SubscribeUserUpdated(handler UserHandler) {
+       b.subscribeUser(UserUpdateEvent, handler)
+}
+
+// SubscribeUserDeleted registers a typed handler invoked whenever a
+// user::delete event is published.
+func (b *Bus) SubscribeUserDeleted(handler UserHandler) {
+       b.subscribeUser(UserDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeUser(topic EventType, handler UserHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch u := value.(type) {
+               case models.User:
+                       handler(u)
+               case *models.User:
+                       if u != nil {
+                               handler(*u)
+                       }
+               default:
+                       slog.Warn("user event payload has unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/service/compute_allocation.go 
b/pkg/service/compute_allocation.go
index e51a624e4..47e010a48 100644
--- a/pkg/service/compute_allocation.go
+++ b/pkg/service/compute_allocation.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -65,6 +66,8 @@ func (s *Service) CreateComputeAllocation(ctx 
context.Context, alloc *models.Com
        }); err != nil {
                return nil, fmt.Errorf("create compute allocation: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationCreateEvent, alloc)
        return alloc, nil
 }
 
@@ -109,6 +112,8 @@ func (s *Service) UpdateComputeAllocation(ctx 
context.Context, alloc *models.Com
        }); err != nil {
                return fmt.Errorf("update compute allocation: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationUpdateEvent, alloc)
        return nil
 }
 
@@ -117,10 +122,19 @@ func (s *Service) DeleteComputeAllocation(ctx 
context.Context, id string) error
        if id == "" {
                return fmt.Errorf("%w: compute allocation id is required", 
ErrInvalidInput)
        }
+       alloc, err := s.allocs.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup compute allocation: %w", err)
+       }
+       if alloc == nil {
+               return ErrNotFound
+       }
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.allocs.Delete(ctx, tx, id)
        }); err != nil {
                return fmt.Errorf("delete compute allocation: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationDeleteEvent, alloc)
        return nil
 }
diff --git a/pkg/service/compute_allocation_diff.go 
b/pkg/service/compute_allocation_diff.go
index 49a414bf2..0375e1fee 100644
--- a/pkg/service/compute_allocation_diff.go
+++ b/pkg/service/compute_allocation_diff.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -61,6 +62,8 @@ func (s *Service) CreateComputeAllocationDiff(ctx 
context.Context, diff *models.
        }); err != nil {
                return nil, fmt.Errorf("create compute allocation diff: %w", 
err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationDiffCreateEvent, diff)
        return diff, nil
 }
 
@@ -113,10 +116,19 @@ func (s *Service) DeleteComputeAllocationDiff(ctx 
context.Context, id string) er
        if id == "" {
                return fmt.Errorf("%w: compute allocation diff id is required", 
ErrInvalidInput)
        }
+       diff, err := s.allocDiffs.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup compute allocation diff: %w", err)
+       }
+       if diff == nil {
+               return ErrNotFound
+       }
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.allocDiffs.Delete(ctx, tx, id)
        }); err != nil {
                return fmt.Errorf("delete compute allocation diff: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationDiffDeleteEvent, diff)
        return nil
 }
diff --git a/pkg/service/compute_allocation_membership.go 
b/pkg/service/compute_allocation_membership.go
index b010224c5..9d6e0056d 100644
--- a/pkg/service/compute_allocation_membership.go
+++ b/pkg/service/compute_allocation_membership.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -69,6 +70,8 @@ func (s *Service) CreateComputeAllocationMembership(ctx 
context.Context, m *mode
        }); err != nil {
                return nil, fmt.Errorf("create compute allocation membership: 
%w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationMembershipCreateEvent, m)
        return m, nil
 }
 
@@ -146,6 +149,8 @@ func (s *Service) UpdateComputeAllocationMembership(ctx 
context.Context, m *mode
        }); err != nil {
                return nil, fmt.Errorf("update compute allocation membership: 
%w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, m)
        return m, nil
 }
 
@@ -171,6 +176,8 @@ func (s *Service) UpdateMembershipAllocationAmount(ctx 
context.Context, id strin
        }); err != nil {
                return nil, fmt.Errorf("update compute allocation membership 
amount: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, 
existing)
        return existing, nil
 }
 
@@ -196,6 +203,8 @@ func (s *Service) UpdateMembershipStatus(ctx 
context.Context, id string, status
        }); err != nil {
                return nil, fmt.Errorf("update compute allocation membership 
status: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, 
existing)
        return existing, nil
 }
 
@@ -216,5 +225,7 @@ func (s *Service) DeleteComputeAllocationMembership(ctx 
context.Context, id stri
        }); err != nil {
                return fmt.Errorf("delete compute allocation membership: %w", 
err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationMembershipDeleteEvent, 
existing)
        return nil
 }
diff --git a/pkg/service/compute_allocation_resource.go 
b/pkg/service/compute_allocation_resource.go
index 433f019d5..4efb28b51 100644
--- a/pkg/service/compute_allocation_resource.go
+++ b/pkg/service/compute_allocation_resource.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -46,6 +47,8 @@ func (s *Service) CreateComputeAllocationResource(ctx 
context.Context, resource
        }); err != nil {
                return nil, fmt.Errorf("create compute allocation resource: 
%w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationResourceCreateEvent, 
resource)
        return resource, nil
 }
 
@@ -81,6 +84,8 @@ func (s *Service) UpdateComputeAllocationResource(ctx 
context.Context, resource
        }); err != nil {
                return fmt.Errorf("update compute allocation resource: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationResourceUpdateEvent, 
resource)
        return nil
 }
 
@@ -89,10 +94,19 @@ func (s *Service) DeleteComputeAllocationResource(ctx 
context.Context, id string
        if id == "" {
                return fmt.Errorf("%w: compute allocation resource id is 
required", ErrInvalidInput)
        }
+       resource, err := s.resources.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup compute allocation resource: %w", err)
+       }
+       if resource == nil {
+               return ErrNotFound
+       }
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.resources.Delete(ctx, tx, id)
        }); err != nil {
                return fmt.Errorf("delete compute allocation resource: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationResourceDeleteEvent, 
resource)
        return nil
 }
diff --git a/pkg/service/compute_allocation_resource_mapping.go 
b/pkg/service/compute_allocation_resource_mapping.go
index 84c4b52e0..d0c96014c 100644
--- a/pkg/service/compute_allocation_resource_mapping.go
+++ b/pkg/service/compute_allocation_resource_mapping.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -65,6 +66,8 @@ func (s *Service) AttachResourceToAllocation(ctx 
context.Context, allocationID,
        }); err != nil {
                return nil, fmt.Errorf("attach resource to allocation: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationResourceMappingCreateEvent, 
mapping)
        return mapping, nil
 }
 
@@ -89,6 +92,8 @@ func (s *Service) DetachResourceFromAllocation(ctx 
context.Context, allocationID
        }); err != nil {
                return fmt.Errorf("detach resource from allocation: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationResourceMappingDeleteEvent, 
existing)
        return nil
 }
 
diff --git a/pkg/service/compute_cluster.go b/pkg/service/compute_cluster.go
index 106be4a21..41198709b 100644
--- a/pkg/service/compute_cluster.go
+++ b/pkg/service/compute_cluster.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -49,6 +50,8 @@ func (s *Service) CreateComputeCluster(ctx context.Context, 
cluster *models.Comp
        }); err != nil {
                return nil, fmt.Errorf("create compute cluster: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationCreateEvent, cluster)
        return cluster, nil
 }
 
@@ -99,6 +102,8 @@ func (s *Service) UpdateComputeCluster(ctx context.Context, 
cluster *models.Comp
        }); err != nil {
                return fmt.Errorf("update compute cluster: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationUpdateEvent, cluster)
        return nil
 }
 
@@ -107,10 +112,19 @@ func (s *Service) DeleteComputeCluster(ctx 
context.Context, id string) error {
        if id == "" {
                return fmt.Errorf("%w: compute cluster id is required", 
ErrInvalidInput)
        }
+       cluster, err := s.clusters.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup compute cluster: %w", err)
+       }
+       if cluster == nil {
+               return ErrNotFound
+       }
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.clusters.Delete(ctx, tx, id)
        }); err != nil {
                return fmt.Errorf("delete compute cluster: %w", err)
        }
+
+       s.eventBus.Publish(events.ComputeAllocationDeleteEvent, cluster)
        return nil
 }
diff --git a/pkg/service/organization.go b/pkg/service/organization.go
index bca060a06..50effd1e7 100644
--- a/pkg/service/organization.go
+++ b/pkg/service/organization.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -51,6 +52,8 @@ func (s *Service) CreateOrganization(ctx context.Context, org 
*models.Organizati
        }); err != nil {
                return nil, fmt.Errorf("create organization: %w", err)
        }
+
+       s.eventBus.Publish(events.OrganizationCreateEvent, org)
        return org, nil
 }
 
@@ -89,6 +92,8 @@ func (s *Service) UpdateOrganization(ctx context.Context, org 
*models.Organizati
        }); err != nil {
                return fmt.Errorf("update organization: %w", err)
        }
+
+       s.eventBus.Publish(events.OrganizationUpdateEvent, org)
        return nil
 }
 
@@ -97,10 +102,19 @@ func (s *Service) DeleteOrganization(ctx context.Context, 
id string) error {
        if id == "" {
                return fmt.Errorf("%w: organization id is required", 
ErrInvalidInput)
        }
+       org, err := s.orgs.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup organization: %w", err)
+       }
+       if org == nil {
+               return ErrNotFound
+       }
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.orgs.Delete(ctx, tx, id)
        }); err != nil {
                return fmt.Errorf("delete organization: %w", err)
        }
+
+       s.eventBus.Publish(events.OrganizationDeleteEvent, org)
        return nil
 }
diff --git a/pkg/service/project.go b/pkg/service/project.go
index f503a537d..938ff4f8e 100644
--- a/pkg/service/project.go
+++ b/pkg/service/project.go
@@ -22,6 +22,7 @@ import (
        "database/sql"
        "fmt"
 
+       "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
 )
 
@@ -64,6 +65,8 @@ func (s *Service) CreateProject(ctx context.Context, project 
*models.Project) (*
        }); err != nil {
                return nil, fmt.Errorf("create project: %w", err)
        }
+
+       s.eventBus.Publish(events.ProjectCreateEvent, project)
        return project, nil
 }
 
@@ -110,6 +113,8 @@ func (s *Service) UpdateProject(ctx context.Context, 
project *models.Project) er
        }); err != nil {
                return fmt.Errorf("update project: %w", err)
        }
+
+       s.eventBus.Publish(events.ProjectUpdateEvent, project)
        return nil
 }
 
@@ -118,10 +123,19 @@ func (s *Service) DeleteProject(ctx context.Context, id 
string) error {
        if id == "" {
                return fmt.Errorf("%w: project id is required", ErrInvalidInput)
        }
+       project, err := s.projs.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup project: %w", err)
+       }
+       if project == nil {
+               return ErrNotFound
+       }
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.projs.Delete(ctx, tx, id)
        }); err != nil {
                return fmt.Errorf("delete project: %w", err)
        }
+
+       s.eventBus.Publish(events.ProjectDeleteEvent, project)
        return nil
 }
diff --git a/pkg/service/service.go b/pkg/service/service.go
index 6fd70c88f..935630ac5 100644
--- a/pkg/service/service.go
+++ b/pkg/service/service.go
@@ -27,6 +27,7 @@ import (
 
        "github.com/apache/airavata-custos/internal/db"
        "github.com/apache/airavata-custos/internal/store"
+       "github.com/apache/airavata-custos/pkg/events"
 )
 
 // Service is a high-level façade over the underlying stores. It wraps each
@@ -34,6 +35,7 @@ import (
 // *sql.Tx themselves.
 type Service struct {
        db               *sqlx.DB
+       eventBus         *events.Bus
        orgs             store.OrganizationStore
        users            store.UserStore
        projs            store.ProjectStore
@@ -51,9 +53,10 @@ type Service struct {
 
 // New constructs a Service backed by the supplied database handle.
 // Stores are instantiated internally using the default MySQL implementations.
-func New(database *sqlx.DB) *Service {
+func New(database *sqlx.DB, eventBus *events.Bus) *Service {
        return &Service{
                db:               database,
+               eventBus:         eventBus,
                orgs:             store.NewOrganizationStore(database),
                users:            store.NewUserStore(database),
                projs:            store.NewProjectStore(database),
@@ -75,6 +78,7 @@ func New(database *sqlx.DB) *Service {
 // external callers.
 func NewWithStores(
        database *sqlx.DB,
+       eventBus *events.Bus,
        orgs store.OrganizationStore,
        users store.UserStore,
        projs store.ProjectStore,
@@ -91,6 +95,7 @@ func NewWithStores(
 ) *Service {
        return &Service{
                db:               database,
+               eventBus:         eventBus,
                orgs:             orgs,
                users:            users,
                projs:            projs,


Reply via email to