This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch access-integration-v3
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 2666a188d8ccb84a4c96c68ac000965c8657278f
Author: lahiruj <[email protected]>
AuthorDate: Wed May 20 18:20:11 2026 -0400

    Add MigrateConnectorFS for per-connector schema isolation
---
 cmd/server/main.go     |  2 ++
 internal/db/migrate.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git a/cmd/server/main.go b/cmd/server/main.go
index e2cc07006..34d12bbd9 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -66,6 +66,8 @@ func run() error {
        }
        defer database.Close()
 
+       // Core schema must run before connector migrations because connector
+       // schemas may FK into core tables.
        if err := db.MigrateEmbedded(database); err != nil {
                return err
        }
diff --git a/internal/db/migrate.go b/internal/db/migrate.go
index 446c26b9e..f8a53cfea 100644
--- a/internal/db/migrate.go
+++ b/internal/db/migrate.go
@@ -20,7 +20,9 @@ package db
 import (
        "errors"
        "fmt"
+       "io/fs"
        "log/slog"
+       "regexp"
 
        "github.com/golang-migrate/migrate/v4"
        "github.com/golang-migrate/migrate/v4/database/mysql"
@@ -53,3 +55,56 @@ func MigrateEmbedded(database *sqlx.DB) error {
        slog.Info("database migrations applied successfully")
        return nil
 }
+
+// connectorNamePattern restricts connector names to a safe identifier shape so
+// the value can be concatenated into a SQL table name 
(schema_migrations_<name>)
+// without escaping concerns.
+var connectorNamePattern = regexp.MustCompile(`^[a-z][a-z0-9_]{0,30}$`)
+
+// MigrateConnectorFS applies pending migrations from src against the database,
+// tracking version state in schema_migrations_<name> so each connector
+// versions its schema independently of core and of other connectors.
+func MigrateConnectorFS(database *sqlx.DB, src fs.FS, dir, name string) error {
+       if database == nil {
+               return errors.New("database is required")
+       }
+       if src == nil {
+               return errors.New("migration source is required")
+       }
+       if dir == "" {
+               return errors.New("migration directory is required")
+       }
+       if !connectorNamePattern.MatchString(name) {
+               return fmt.Errorf("connector name %q is invalid; must match 
%s", name, connectorNamePattern)
+       }
+
+       driver, err := mysql.WithInstance(database.DB, &mysql.Config{
+               MigrationsTable: "schema_migrations_" + name,
+       })
+       if err != nil {
+               return fmt.Errorf("create migration driver for %s: %w", name, 
err)
+       }
+
+       source, err := iofs.New(src, dir)
+       if err != nil {
+               return fmt.Errorf("create migration source for %s: %w", name, 
err)
+       }
+
+       m, err := migrate.NewWithInstance("iofs", source, "mysql", driver)
+       if err != nil {
+               return fmt.Errorf("create migrator for %s: %w", name, err)
+       }
+
+       before, _, _ := m.Version()
+       upErr := m.Up()
+       after, _, _ := m.Version()
+       if upErr != nil && !errors.Is(upErr, migrate.ErrNoChange) {
+               return fmt.Errorf("run migrations for %s: %w", name, upErr)
+       }
+       if errors.Is(upErr, migrate.ErrNoChange) {
+               slog.Info("connector migrations already up to date", 
"connector", name, "version", after)
+               return nil
+       }
+       slog.Info("connector migrations applied", "connector", name, "from", 
before, "to", after)
+       return nil
+}

Reply via email to