This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 28c3d1c [BEAM-13224][Playground] Add using context to local cache to stop goroutine new 93e5e61 Merge pull request #15959 from [BEAM-13224][Playground] Local cache creates an forever used goroutine 28c3d1c is described below commit 28c3d1c65fd0a2b3d1e8b5cc8c63a7dc71469c1a Author: AydarZaynutdinov <aydar.zaynutdi...@akvelon.com> AuthorDate: Fri Nov 12 13:36:53 2021 +0300 [BEAM-13224][Playground] Add using context to local cache to stop goroutine --- .../backend/internal/cache/local/local_cache.go | 23 +++++++++++++--------- .../internal/cache/local/local_cache_test.go | 8 +++++++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/playground/backend/internal/cache/local/local_cache.go b/playground/backend/internal/cache/local/local_cache.go index fb78bba..6598d09 100644 --- a/playground/backend/internal/cache/local/local_cache.go +++ b/playground/backend/internal/cache/local/local_cache.go @@ -44,7 +44,7 @@ func New(ctx context.Context) *Cache { pipelinesExpiration: pipelinesExpiration, } - go ls.startGC() + go ls.startGC(ctx) return ls } @@ -99,16 +99,21 @@ func (lc *Cache) SetExpTime(ctx context.Context, pipelineId uuid.UUID, expTime t return nil } -func (lc *Cache) startGC() { +func (lc *Cache) startGC(ctx context.Context) { + ticker := time.NewTicker(lc.cleanupInterval) for { - <-time.After(lc.cleanupInterval) - - if lc.items == nil { + select { + case <-ctx.Done(): + ticker.Stop() return - } - - if pipelines := lc.expiredPipelines(); len(pipelines) != 0 { - lc.clearItems(pipelines) + case <-ticker.C: + if lc.items == nil { + return + } + + if pipelines := lc.expiredPipelines(); len(pipelines) != 0 { + lc.clearItems(pipelines) + } } } } diff --git a/playground/backend/internal/cache/local/local_cache_test.go b/playground/backend/internal/cache/local/local_cache_test.go index 4964dbe..8bdb45d 100644 --- a/playground/backend/internal/cache/local/local_cache_test.go +++ b/playground/backend/internal/cache/local/local_cache_test.go @@ -19,6 +19,7 @@ import ( "beam.apache.org/playground/backend/internal/cache" "context" "github.com/google/uuid" + "go.uber.org/goleak" "reflect" "testing" "time" @@ -228,6 +229,11 @@ func TestLocalCache_SetExpTime(t *testing.T) { } func TestLocalCache_startGC(t *testing.T) { + defer goleak.VerifyNone(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + preparedId, _ := uuid.NewUUID() preparedItemsMap := make(map[uuid.UUID]map[cache.SubKey]interface{}) preparedItemsMap[preparedId] = make(map[cache.SubKey]interface{}) @@ -260,7 +266,7 @@ func TestLocalCache_startGC(t *testing.T) { items: tt.fields.items, pipelinesExpiration: tt.fields.pipelinesExpiration, } - go lc.startGC() + go lc.startGC(ctx) time.Sleep(time.Millisecond) if len(tt.fields.items) != 0 { t.Errorf("Pipeline: %s not deleted in time.", preparedId)