This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f051cd9 [BEAM-11097] Add SideInputCache to StateReader (#15563) f051cd9 is described below commit f051cd91d46e5dab0ca48f108b27d9d87e6e5e8f Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com> AuthorDate: Fri Sep 24 18:45:23 2021 -0400 [BEAM-11097] Add SideInputCache to StateReader (#15563) --- sdks/go/pkg/beam/core/runtime/exec/data.go | 4 ++++ sdks/go/pkg/beam/core/runtime/harness/harness.go | 2 +- .../core/runtime/harness/statecache/statecache.go | 22 +++++++++++++++++----- .../runtime/harness/statecache/statecache_test.go | 3 +-- sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 15 ++++++++++++++- 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/data.go b/sdks/go/pkg/beam/core/runtime/exec/data.go index 2b00a37..f891c9c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/data.go +++ b/sdks/go/pkg/beam/core/runtime/exec/data.go @@ -19,6 +19,8 @@ import ( "context" "fmt" "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" ) // Port represents the connection port of external operations. @@ -59,6 +61,8 @@ type StateReader interface { OpenSideInput(ctx context.Context, id StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error) // OpenIterable opens a byte stream for reading unwindowed iterables from the runner. OpenIterable(ctx context.Context, id StreamID, key []byte) (io.ReadCloser, error) + // GetSideInputCache returns the SideInputCache being used at the harness level. + GetSideInputCache() *statecache.SideInputCache } // TODO(herohde) 7/20/2018: user state management diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index a46f2c3..09c1847 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -318,7 +318,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } data := NewScopedDataManager(c.data, instID) - state := NewScopedStateReader(c.state, instID) + state := NewScopedStateReaderWithCache(c.state, instID, c.cache) err = plan.Execute(ctx, string(instID), exec.DataContext{Data: data, State: state}) data.Close() state.Close() diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 5496d8b..99a093d 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -23,13 +23,25 @@ package statecache import ( "sync" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" ) type token string +// ReusableInput is a resettable value, notably used to unwind iterators cheaply +// and cache materialized side input across invocations. +// +// Redefined from exec's input.go to avoid a cyclical dependency. +type ReusableInput interface { + // Init initializes the value before use. + Init() error + // Value returns the side input value. + Value() interface{} + // Reset resets the value after use. + Reset() error +} + // SideInputCache stores a cache of reusable inputs for the purposes of // eliminating redundant calls to the runner during execution of ParDos // using side inputs. @@ -44,7 +56,7 @@ type token string type SideInputCache struct { capacity int mu sync.Mutex - cache map[token]exec.ReusableInput + cache map[token]ReusableInput idsToTokens map[string]token validTokens map[token]int8 // Maps tokens to active bundle counts metrics CacheMetrics @@ -66,7 +78,7 @@ func (c *SideInputCache) Init(cap int) error { } c.mu.Lock() defer c.mu.Unlock() - c.cache = make(map[token]exec.ReusableInput, cap) + c.cache = make(map[token]ReusableInput, cap) c.idsToTokens = make(map[string]token) c.validTokens = make(map[token]int8) c.capacity = cap @@ -148,7 +160,7 @@ func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) ( // input has been cached. A query having a bad token (e.g. one that doesn't make a known // token or one that makes a known but currently invalid token) is treated the same as a // cache miss. -func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReusableInput { +func (c *SideInputCache) QueryCache(transformID, sideInputID string) ReusableInput { c.mu.Lock() defer c.mu.Unlock() tok, ok := c.makeAndValidateToken(transformID, sideInputID) @@ -170,7 +182,7 @@ func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.Reusab // with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token // then we silently do not cache the input, as this is an indication that the runner is treating that input // as uncacheable. -func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.ReusableInput) { +func (c *SideInputCache) SetCache(transformID, sideInputID string, input ReusableInput) { c.mu.Lock() defer c.mu.Unlock() tok, ok := c.makeAndValidateToken(transformID, sideInputID) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index b9970c3..18f2f38 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -18,7 +18,6 @@ package statecache import ( "testing" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" ) @@ -30,7 +29,7 @@ type TestReusableInput struct { value interface{} } -func makeTestReusableInput(transformID, sideInputID string, value interface{}) exec.ReusableInput { +func makeTestReusableInput(transformID, sideInputID string, value interface{}) ReusableInput { return &TestReusableInput{transformID: transformID, sideInputID: sideInputID, value: value} } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index 09daa11..31d75b6 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -39,11 +40,18 @@ type ScopedStateReader struct { opened []io.Closer // track open readers to force close all closed bool mu sync.Mutex + + cache *statecache.SideInputCache } // NewScopedStateReader returns a ScopedStateReader for the given instruction. func NewScopedStateReader(mgr *StateChannelManager, instID instructionID) *ScopedStateReader { - return &ScopedStateReader{mgr: mgr, instID: instID} + return &ScopedStateReader{mgr: mgr, instID: instID, cache: nil} +} + +// NewScopedStateReaderWithCache returns a ScopedState reader for the given instruction with a pointer to a SideInputCache. +func NewScopedStateReaderWithCache(mgr *StateChannelManager, instID instructionID, cache *statecache.SideInputCache) *ScopedStateReader { + return &ScopedStateReader{mgr: mgr, instID: instID, cache: cache} } // OpenSideInput opens a byte stream for reading iterable side input. @@ -60,6 +68,11 @@ func (s *ScopedStateReader) OpenIterable(ctx context.Context, id exec.StreamID, }) } +// GetSideInputCache returns a pointer to the SideInputCache being used by the SDK harness. +func (s *ScopedStateReader) GetSideInputCache() *statecache.SideInputCache { + return s.cache +} + func (s *ScopedStateReader) openReader(ctx context.Context, id exec.StreamID, readerFn func(*StateChannel) *stateKeyReader) (*stateKeyReader, error) { ch, err := s.open(ctx, id.Port) if err != nil {