This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch access-integration-v3
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 8b4a0b478f0bafc321453cbb37d8a4b7aff5e7e4
Author: lahiruj <[email protected]>
AuthorDate: Thu May 21 03:37:21 2026 -0400

    Wait for connector goroutines to drain on shutdown
---
 cmd/server/main.go                                   | 20 +++++++++++++++++++-
 connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go  | 20 ++++++++++----------
 .../SLURM/Association-Mapper/pkg/smapper/loader.go   |  3 ++-
 internal/connectors/loader.go                        |  7 ++++---
 4 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/cmd/server/main.go b/cmd/server/main.go
index 6c3a96037..14a954b30 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -26,6 +26,7 @@ import (
        "os"
        "os/signal"
        "strconv"
+       "sync"
        "syscall"
        "time"
 
@@ -79,7 +80,10 @@ func run() error {
        ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, 
syscall.SIGTERM)
        defer stop()
 
-       if err := connectors.LoadConnectors(ctx, database, eventBus, svc); err 
!= nil {
+       // Tracks every background goroutine spawned by connectors so we can 
wait
+       // for them to drain on shutdown instead of killing them mid-flight.
+       var connectorsWG sync.WaitGroup
+       if err := connectors.LoadConnectors(ctx, database, eventBus, svc, 
&connectorsWG); err != nil {
                return err
        }
 
@@ -117,6 +121,20 @@ func run() error {
        if err := httpServer.Shutdown(shutdownCtx); err != nil {
                return err
        }
+
+       slog.Info("waiting for connectors to drain")
+       connectorsDone := make(chan struct{})
+       go func() {
+               connectorsWG.Wait()
+               close(connectorsDone)
+       }()
+       select {
+       case <-connectorsDone:
+               slog.Info("connectors drained cleanly")
+       case <-time.After(30 * time.Second):
+               slog.Warn("connector drain timed out; some workers may have 
leaked")
+       }
+
        slog.Info("server stopped cleanly")
        return nil
 }
diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go 
b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
index e6dc0d5d5..7dcc52c7d 100644
--- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
+++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
@@ -43,10 +43,9 @@ import (
 
 const connectorName = "amie"
 
-// LoadConnector applies the connector's migrations and starts its background
-// workers. It returns nil and skips when AMIE_BASE_URL / AMIE_SITE_CODE /
+// LoadConnector skips silently when AMIE_BASE_URL / AMIE_SITE_CODE /
 // AMIE_API_KEY are not all set.
-func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus 
*events.Bus, coreService *coreservice.Service) error {
+func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus 
*events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup) error {
        cfg := loadConfig()
        if cfg.AMIE.APIKey == "" || cfg.AMIE.BaseURL == "" || cfg.AMIE.SiteCode 
== "" {
                slog.Warn("AMIE credentials not fully provided, skipping AMIE 
connector")
@@ -92,15 +91,16 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, 
eventBus *events.Bus,
        poller := worker.NewPoller(amie, packetStore, eventStore, met, 
database, cfg.AMIE)
        processor := worker.NewProcessor(eventStore, packetStore, errorStore, 
router, met, database, cfg.AMIE)
 
-       var wg sync.WaitGroup
        wg.Add(2)
-       go func() { defer wg.Done(); poller.Run(ctx) }()
-       go func() { defer wg.Done(); processor.Run(ctx) }()
-
        go func() {
-               <-ctx.Done()
-               wg.Wait()
-               slog.Info("AMIE connector stopped")
+               defer wg.Done()
+               poller.Run(ctx)
+               slog.Info("AMIE poller stopped")
+       }()
+       go func() {
+               defer wg.Done()
+               processor.Run(ctx)
+               slog.Info("AMIE processor stopped")
        }()
 
        slog.Info("AMIE connector started", "site", cfg.AMIE.SiteCode, 
"baseURL", cfg.AMIE.BaseURL)
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go 
b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
index 7efd4b29a..9eee2ae74 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
@@ -4,6 +4,7 @@ import (
        "context"
        "log/slog"
        "os"
+       "sync"
 
        "github.com/jmoiron/sqlx"
 
@@ -13,7 +14,7 @@ import (
        "github.com/apache/airavata-custos/pkg/service"
 )
 
-func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service) error {
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup) error {
 
        // Read url, username, and password from environment variables
        apiUrl := os.Getenv("SLURM_API")
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index 0632fd7eb..99fb221c0 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -20,6 +20,7 @@ package connectors
 import (
        "context"
        "log/slog"
+       "sync"
 
        "github.com/jmoiron/sqlx"
 
@@ -29,17 +30,17 @@ import (
        "github.com/apache/airavata-custos/pkg/service"
 )
 
-func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus 
*events.Bus, coreService *service.Service) error {
+func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus 
*events.Bus, coreService *service.Service, wg *sync.WaitGroup) error {
        slog.Info("loading connectors")
 
        slog.Info("loading SLURM Association Mapper connector")
-       if err := smapper.LoadConnector(ctx, database, eventBus, coreService); 
err != nil {
+       if err := smapper.LoadConnector(ctx, database, eventBus, coreService, 
wg); err != nil {
                slog.Error("failed to load SLURM Association Mapper connector", 
"error", err)
                return err
        }
 
        slog.Info("loading AMIE connector")
-       if err := amie.LoadConnector(ctx, database, eventBus, coreService); err 
!= nil {
+       if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg); 
err != nil {
                slog.Error("failed to load AMIE connector", "error", err)
                return err
        }

Reply via email to