This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b44d3637c8 [Playground] derf Stop tickers to avoid leaks. (#26507)
5b44d3637c8 is described below

commit 5b44d3637c82d8844515e701fa14a065d7849015
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Wed May 3 16:39:43 2023 -0700

    [Playground] derf Stop tickers to avoid leaks. (#26507)
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 playground/backend/internal/cache/local/local_cache.go         | 1 +
 playground/backend/internal/code_processing/code_processing.go | 6 +++---
 playground/backend/internal/db/datastore/emulator_wrapper.go   | 2 ++
 playground/backend/internal/emulators/kafka.go                 | 7 ++++---
 4 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/playground/backend/internal/cache/local/local_cache.go 
b/playground/backend/internal/cache/local/local_cache.go
index 72568540710..881b6b523d2 100644
--- a/playground/backend/internal/cache/local/local_cache.go
+++ b/playground/backend/internal/cache/local/local_cache.go
@@ -166,6 +166,7 @@ func (lc *Cache) GetSdkCatalog(_ context.Context) 
([]*entity.SDKEntity, error) {
 
 func (lc *Cache) startGC(ctx context.Context) {
        ticker := time.NewTicker(lc.cleanupInterval)
+       defer ticker.Stop()
        for {
                select {
                case <-ctx.Done():
diff --git a/playground/backend/internal/code_processing/code_processing.go 
b/playground/backend/internal/code_processing/code_processing.go
index be4c88709ba..d371860430e 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -434,10 +434,10 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx 
context.Context, pipelineId uu
 // If cancel flag exists, and it is true it means that the code processing was 
canceled. Set true to cancelChannel and return.
 func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelFunc 
context.CancelFunc, cacheService cache.Cache) {
        ticker := time.NewTicker(pauseDuration)
+       defer ticker.Stop()
        for {
                select {
                case <-ctx.Done():
-                       ticker.Stop()
                        return
                case <-ticker.C:
                        // Use background context for the cache operation to 
avoid failure when the main context is timed out.
@@ -460,12 +460,12 @@ func cancelCheck(ctx context.Context, pipelineId 
uuid.UUID, cancelFunc context.C
 // In other case each pauseDuration checks that graph file exists or not and 
try to save it to the cache.
 func readGraphFile(pipelineLifeCycleCtx context.Context, cacheService 
cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
        ticker := time.NewTicker(pauseDuration)
+       defer ticker.Stop()
        for {
                select {
                // waiting when graph file appears
                case <-ticker.C:
                        if _, err := os.Stat(graphFilePath); err == nil {
-                               ticker.Stop()
                                graph, err := os.ReadFile(graphFilePath)
                                if err != nil {
                                        logger.Errorf("%s: Error during saving 
graph to the file: %s", pipelineId, err.Error())
@@ -474,7 +474,6 @@ func readGraphFile(pipelineLifeCycleCtx context.Context, 
cacheService cache.Cach
                        }
                // in case of timeout or cancel
                case <-pipelineLifeCycleCtx.Done():
-                       ticker.Stop()
                        if _, err := os.Stat(graphFilePath); err == nil {
                                graph, err := os.ReadFile(graphFilePath)
                                if err != nil {
@@ -497,6 +496,7 @@ func readGraphFile(pipelineLifeCycleCtx context.Context, 
cacheService cache.Cach
 // In other case each pauseDuration write to cache logs of the code processing.
 func readLogFile(pipelineLifeCycleCtx context.Context, cacheService 
cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, 
finishReadLogChannel chan bool) {
        ticker := time.NewTicker(pauseDuration)
+       defer ticker.Stop()
        for {
                select {
                // in case of timeout or cancel
diff --git a/playground/backend/internal/db/datastore/emulator_wrapper.go 
b/playground/backend/internal/db/datastore/emulator_wrapper.go
index 9252c454b01..f1cf0206b66 100644
--- a/playground/backend/internal/db/datastore/emulator_wrapper.go
+++ b/playground/backend/internal/db/datastore/emulator_wrapper.go
@@ -130,7 +130,9 @@ func startEmulator() (*emulator, error) {
 
                ok := func() bool {
                        workTicker := time.NewTicker(pauseDuration)
+                       defer workTicker.Stop()
                        globalTicker := time.NewTicker(waitDuration)
+                       defer workTicker.Stop()
                        for {
                                select {
                                case <-workTicker.C:
diff --git a/playground/backend/internal/emulators/kafka.go 
b/playground/backend/internal/emulators/kafka.go
index 58112439c67..031373b47b0 100644
--- a/playground/backend/internal/emulators/kafka.go
+++ b/playground/backend/internal/emulators/kafka.go
@@ -16,7 +16,6 @@
 package emulators
 
 import (
-       "beam.apache.org/playground/backend/internal/logger"
        "bufio"
        "encoding/json"
        "errors"
@@ -28,10 +27,10 @@ import (
        "strings"
        "time"
 
+       "beam.apache.org/playground/backend/internal/constants"
+       "beam.apache.org/playground/backend/internal/logger"
        "github.com/confluentinc/confluent-kafka-go/kafka"
        "github.com/linkedin/goavro"
-
-       "beam.apache.org/playground/backend/internal/constants"
 )
 
 const (
@@ -109,7 +108,9 @@ func NewKafkaMockCluster(emulatorExecutablePath string) 
(*KafkaMockCluster, erro
        bootstrapServers := fmt.Sprintf("%s%s%s", host, addressSeperator, port)
 
        workTicker := time.NewTicker(pauseDuration)
+       defer workTicker.Stop()
        globalTicker := time.NewTicker(globalDuration)
+       defer globalTicker.Stop()
        for {
                select {
                case <-workTicker.C:

Reply via email to