This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f051cd9  [BEAM-11097] Add SideInputCache to StateReader (#15563)
f051cd9 is described below

commit f051cd91d46e5dab0ca48f108b27d9d87e6e5e8f
Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com>
AuthorDate: Fri Sep 24 18:45:23 2021 -0400

    [BEAM-11097] Add SideInputCache to StateReader (#15563)
---
 sdks/go/pkg/beam/core/runtime/exec/data.go         |  4 ++++
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  2 +-
 .../core/runtime/harness/statecache/statecache.go  | 22 +++++++++++++++++-----
 .../runtime/harness/statecache/statecache_test.go  |  3 +--
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go  | 15 ++++++++++++++-
 5 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/data.go 
b/sdks/go/pkg/beam/core/runtime/exec/data.go
index 2b00a37..f891c9c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/data.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/data.go
@@ -19,6 +19,8 @@ import (
        "context"
        "fmt"
        "io"
+
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
 )
 
 // Port represents the connection port of external operations.
@@ -59,6 +61,8 @@ type StateReader interface {
        OpenSideInput(ctx context.Context, id StreamID, sideInputID string, 
key, w []byte) (io.ReadCloser, error)
        // OpenIterable opens a byte stream for reading unwindowed iterables 
from the runner.
        OpenIterable(ctx context.Context, id StreamID, key []byte) 
(io.ReadCloser, error)
+       // GetSideInputCache returns the SideInputCache being used at the 
harness level.
+       GetSideInputCache() *statecache.SideInputCache
 }
 
 // TODO(herohde) 7/20/2018: user state management
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index a46f2c3..09c1847 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -318,7 +318,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                }
 
                data := NewScopedDataManager(c.data, instID)
-               state := NewScopedStateReader(c.state, instID)
+               state := NewScopedStateReaderWithCache(c.state, instID, c.cache)
                err = plan.Execute(ctx, string(instID), exec.DataContext{Data: 
data, State: state})
                data.Close()
                state.Close()
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go 
b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
index 5496d8b..99a093d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
@@ -23,13 +23,25 @@ package statecache
 import (
        "sync"
 
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
 )
 
 type token string
 
+// ReusableInput is a resettable value, notably used to unwind iterators 
cheaply
+// and cache materialized side input across invocations.
+//
+// Redefined from exec's input.go to avoid a cyclical dependency.
+type ReusableInput interface {
+       // Init initializes the value before use.
+       Init() error
+       // Value returns the side input value.
+       Value() interface{}
+       // Reset resets the value after use.
+       Reset() error
+}
+
 // SideInputCache stores a cache of reusable inputs for the purposes of
 // eliminating redundant calls to the runner during execution of ParDos
 // using side inputs.
@@ -44,7 +56,7 @@ type token string
 type SideInputCache struct {
        capacity    int
        mu          sync.Mutex
-       cache       map[token]exec.ReusableInput
+       cache       map[token]ReusableInput
        idsToTokens map[string]token
        validTokens map[token]int8 // Maps tokens to active bundle counts
        metrics     CacheMetrics
@@ -66,7 +78,7 @@ func (c *SideInputCache) Init(cap int) error {
        }
        c.mu.Lock()
        defer c.mu.Unlock()
-       c.cache = make(map[token]exec.ReusableInput, cap)
+       c.cache = make(map[token]ReusableInput, cap)
        c.idsToTokens = make(map[string]token)
        c.validTokens = make(map[token]int8)
        c.capacity = cap
@@ -148,7 +160,7 @@ func (c *SideInputCache) makeAndValidateToken(transformID, 
sideInputID string) (
 // input has been cached. A query having a bad token (e.g. one that doesn't 
make a known
 // token or one that makes a known but currently invalid token) is treated the 
same as a
 // cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string) 
exec.ReusableInput {
+func (c *SideInputCache) QueryCache(transformID, sideInputID string) 
ReusableInput {
        c.mu.Lock()
        defer c.mu.Unlock()
        tok, ok := c.makeAndValidateToken(transformID, sideInputID)
@@ -170,7 +182,7 @@ func (c *SideInputCache) QueryCache(transformID, 
sideInputID string) exec.Reusab
 // with its corresponding transform ID and side input ID. If the IDs do not 
pair with a known, valid token
 // then we silently do not cache the input, as this is an indication that the 
runner is treating that input
 // as uncacheable.
-func (c *SideInputCache) SetCache(transformID, sideInputID string, input 
exec.ReusableInput) {
+func (c *SideInputCache) SetCache(transformID, sideInputID string, input 
ReusableInput) {
        c.mu.Lock()
        defer c.mu.Unlock()
        tok, ok := c.makeAndValidateToken(transformID, sideInputID)
diff --git 
a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
index b9970c3..18f2f38 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
@@ -18,7 +18,6 @@ package statecache
 import (
        "testing"
 
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
 )
 
@@ -30,7 +29,7 @@ type TestReusableInput struct {
        value       interface{}
 }
 
-func makeTestReusableInput(transformID, sideInputID string, value interface{}) 
exec.ReusableInput {
+func makeTestReusableInput(transformID, sideInputID string, value interface{}) 
ReusableInput {
        return &TestReusableInput{transformID: transformID, sideInputID: 
sideInputID, value: value}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 09daa11..31d75b6 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
@@ -39,11 +40,18 @@ type ScopedStateReader struct {
        opened []io.Closer // track open readers to force close all
        closed bool
        mu     sync.Mutex
+
+       cache *statecache.SideInputCache
 }
 
 // NewScopedStateReader returns a ScopedStateReader for the given instruction.
 func NewScopedStateReader(mgr *StateChannelManager, instID instructionID) 
*ScopedStateReader {
-       return &ScopedStateReader{mgr: mgr, instID: instID}
+       return &ScopedStateReader{mgr: mgr, instID: instID, cache: nil}
+}
+
+// NewScopedStateReaderWithCache returns a ScopedState reader for the given 
instruction with a pointer to a SideInputCache.
+func NewScopedStateReaderWithCache(mgr *StateChannelManager, instID 
instructionID, cache *statecache.SideInputCache) *ScopedStateReader {
+       return &ScopedStateReader{mgr: mgr, instID: instID, cache: cache}
 }
 
 // OpenSideInput opens a byte stream for reading iterable side input.
@@ -60,6 +68,11 @@ func (s *ScopedStateReader) OpenIterable(ctx 
context.Context, id exec.StreamID,
        })
 }
 
+// GetSideInputCache returns a pointer to the SideInputCache being used by the 
SDK harness.
+func (s *ScopedStateReader) GetSideInputCache() *statecache.SideInputCache {
+       return s.cache
+}
+
 func (s *ScopedStateReader) openReader(ctx context.Context, id exec.StreamID, 
readerFn func(*StateChannel) *stateKeyReader) (*stateKeyReader, error) {
        ch, err := s.open(ctx, id.Port)
        if err != nil {

Reply via email to