This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch access-integration
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 2f7de87c2cd737fbaacfd8524633607ebb0b8ce1
Author: lahiruj <[email protected]>
AuthorDate: Tue May 19 02:14:25 2026 -0400

    Add connector registry pattern with per-connector migrations
---
 cmd/server/main.go         | 19 ++++++++--
 internal/db/migrate.go     | 30 +++++++++++++++
 pkg/connectors/registry.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/cmd/server/main.go b/cmd/server/main.go
index 978d5cc92..d6d59497b 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -31,6 +31,7 @@ import (
 
        "github.com/apache/airavata-custos/internal/db"
        "github.com/apache/airavata-custos/internal/server"
+       "github.com/apache/airavata-custos/pkg/connectors"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -73,6 +74,21 @@ func run() error {
        eventBus := events.New()
 
        svc := service.New(database, eventBus)
+
+       ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, 
syscall.SIGTERM)
+       defer stop()
+
+       for _, c := range connectors.All() {
+               if migFS, dir := c.Migrations(); migFS != nil {
+                       if err := db.MigrateConnectorFS(database, migFS, dir, 
c.Name()); err != nil {
+                               return err
+                       }
+               }
+               if err := c.Start(ctx, connectors.Deps{DB: database, Service: 
svc, Bus: eventBus}); err != nil {
+                       return err
+               }
+       }
+
        handler := server.LoggingMiddleware(server.New(svc))
 
        httpServer := &http.Server{
@@ -84,9 +100,6 @@ func run() error {
                IdleTimeout:       120 * time.Second,
        }
 
-       ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, 
syscall.SIGTERM)
-       defer stop()
-
        serverErr := make(chan error, 1)
        go func() {
                slog.Info("http server listening", "addr", addr)
diff --git a/internal/db/migrate.go b/internal/db/migrate.go
index 446c26b9e..b447eab74 100644
--- a/internal/db/migrate.go
+++ b/internal/db/migrate.go
@@ -20,6 +20,7 @@ package db
 import (
        "errors"
        "fmt"
+       "io/fs"
        "log/slog"
 
        "github.com/golang-migrate/migrate/v4"
@@ -53,3 +54,32 @@ func MigrateEmbedded(database *sqlx.DB) error {
        slog.Info("database migrations applied successfully")
        return nil
 }
+
+// MigrateConnectorFS applies migrations from the supplied embed.FS to the
+// supplied database, tracking version state in a per-connector table named
+// schema_migrations_<name>. Connectors invoke this via the host so version
+// sequences never collide across connectors or with core.
+func MigrateConnectorFS(database *sqlx.DB, src fs.FS, dir, name string) error {
+       driver, err := mysql.WithInstance(database.DB, &mysql.Config{
+               MigrationsTable: "schema_migrations_" + name,
+       })
+       if err != nil {
+               return fmt.Errorf("create migration driver for %s: %w", name, 
err)
+       }
+
+       source, err := iofs.New(src, dir)
+       if err != nil {
+               return fmt.Errorf("create migration source for %s: %w", name, 
err)
+       }
+
+       m, err := migrate.NewWithInstance("iofs", source, "mysql", driver)
+       if err != nil {
+               return fmt.Errorf("create migrator for %s: %w", name, err)
+       }
+
+       if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
+               return fmt.Errorf("run migrations for %s: %w", name, err)
+       }
+       slog.Info("connector migrations applied", "connector", name)
+       return nil
+}
diff --git a/pkg/connectors/registry.go b/pkg/connectors/registry.go
new file mode 100644
index 000000000..9c807d266
--- /dev/null
+++ b/pkg/connectors/registry.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package connectors is the registry through which external-system connectors
+// (ACCESS AMIE, NAIRR, etc.) plug into the core binary.
+//
+// A deployment selects which connectors are bundled by importing them
+// blank in cmd/server/connectors.go. Each connector's init() function calls
+// Register, contributing both its embedded migrations and its Start function
+// to the runtime.
+package connectors
+
+import (
+       "context"
+       "io/fs"
+       "sort"
+       "sync"
+
+       "github.com/jmoiron/sqlx"
+
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/service"
+)
+
+// Deps is the bundle the host hands to every connector at startup.
+type Deps struct {
+       DB      *sqlx.DB
+       Service *service.Service
+       Bus     *events.Bus
+}
+
+// Connector is a long-running subsystem that integrates an external system
+// with core. The host applies its migrations first, then calls Start.
+type Connector interface {
+       // Name returns a unique slug for this connector (used to namespace the
+       // per-connector schema_migrations table).
+       Name() string
+       // Migrations returns the embedded migration tree and the subdirectory
+       // within it that holds the .sql files. Return (nil, "") if the 
connector
+       // has no migrations.
+       Migrations() (fs.FS, string)
+       // Start kicks off the connector. It must return promptly; long-running
+       // work belongs in goroutines that respect ctx cancellation.
+       Start(ctx context.Context, deps Deps) error
+}
+
+var (
+       mu       sync.Mutex
+       registry = map[string]Connector{}
+)
+
+// Register adds a connector to the global registry. Typically called from an
+// init() function in the connector's package. Re-registration with the same
+// Name panics — connector names must be unique.
+func Register(c Connector) {
+       mu.Lock()
+       defer mu.Unlock()
+       if _, ok := registry[c.Name()]; ok {
+               panic("connectors: duplicate registration for " + c.Name())
+       }
+       registry[c.Name()] = c
+}
+
+// All returns the registered connectors in deterministic (name-sorted) order.
+func All() []Connector {
+       mu.Lock()
+       defer mu.Unlock()
+       names := make([]string, 0, len(registry))
+       for n := range registry {
+               names = append(names, n)
+       }
+       sort.Strings(names)
+       out := make([]Connector, 0, len(names))
+       for _, n := range names {
+               out = append(out, registry[n])
+       }
+       return out
+}

Reply via email to