This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 28c3d1c  [BEAM-13224][Playground] Add using context to local cache to 
stop goroutine
     new 93e5e61  Merge pull request #15959 from [BEAM-13224][Playground] Local 
cache creates an forever used goroutine
28c3d1c is described below

commit 28c3d1c65fd0a2b3d1e8b5cc8c63a7dc71469c1a
Author: AydarZaynutdinov <aydar.zaynutdi...@akvelon.com>
AuthorDate: Fri Nov 12 13:36:53 2021 +0300

    [BEAM-13224][Playground]
    Add using context to local cache to stop goroutine
---
 .../backend/internal/cache/local/local_cache.go    | 23 +++++++++++++---------
 .../internal/cache/local/local_cache_test.go       |  8 +++++++-
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/playground/backend/internal/cache/local/local_cache.go 
b/playground/backend/internal/cache/local/local_cache.go
index fb78bba..6598d09 100644
--- a/playground/backend/internal/cache/local/local_cache.go
+++ b/playground/backend/internal/cache/local/local_cache.go
@@ -44,7 +44,7 @@ func New(ctx context.Context) *Cache {
                pipelinesExpiration: pipelinesExpiration,
        }
 
-       go ls.startGC()
+       go ls.startGC(ctx)
        return ls
 
 }
@@ -99,16 +99,21 @@ func (lc *Cache) SetExpTime(ctx context.Context, pipelineId 
uuid.UUID, expTime t
        return nil
 }
 
-func (lc *Cache) startGC() {
+func (lc *Cache) startGC(ctx context.Context) {
+       ticker := time.NewTicker(lc.cleanupInterval)
        for {
-               <-time.After(lc.cleanupInterval)
-
-               if lc.items == nil {
+               select {
+               case <-ctx.Done():
+                       ticker.Stop()
                        return
-               }
-
-               if pipelines := lc.expiredPipelines(); len(pipelines) != 0 {
-                       lc.clearItems(pipelines)
+               case <-ticker.C:
+                       if lc.items == nil {
+                               return
+                       }
+
+                       if pipelines := lc.expiredPipelines(); len(pipelines) 
!= 0 {
+                               lc.clearItems(pipelines)
+                       }
                }
        }
 }
diff --git a/playground/backend/internal/cache/local/local_cache_test.go 
b/playground/backend/internal/cache/local/local_cache_test.go
index 4964dbe..8bdb45d 100644
--- a/playground/backend/internal/cache/local/local_cache_test.go
+++ b/playground/backend/internal/cache/local/local_cache_test.go
@@ -19,6 +19,7 @@ import (
        "beam.apache.org/playground/backend/internal/cache"
        "context"
        "github.com/google/uuid"
+       "go.uber.org/goleak"
        "reflect"
        "testing"
        "time"
@@ -228,6 +229,11 @@ func TestLocalCache_SetExpTime(t *testing.T) {
 }
 
 func TestLocalCache_startGC(t *testing.T) {
+       defer goleak.VerifyNone(t)
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        preparedId, _ := uuid.NewUUID()
        preparedItemsMap := make(map[uuid.UUID]map[cache.SubKey]interface{})
        preparedItemsMap[preparedId] = make(map[cache.SubKey]interface{})
@@ -260,7 +266,7 @@ func TestLocalCache_startGC(t *testing.T) {
                                items:               tt.fields.items,
                                pipelinesExpiration: 
tt.fields.pipelinesExpiration,
                        }
-                       go lc.startGC()
+                       go lc.startGC(ctx)
                        time.Sleep(time.Millisecond)
                        if len(tt.fields.items) != 0 {
                                t.Errorf("Pipeline: %s not deleted in time.", 
preparedId)

Reply via email to