This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fa6c3e  [BEAM-11097] Add SideInputCache to harness control type 
(#15530)
5fa6c3e is described below

commit 5fa6c3ec1d3bb5af4cb4a25fa96ef795028a508b
Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com>
AuthorDate: Wed Sep 22 16:24:17 2021 -0400

    [BEAM-11097] Add SideInputCache to harness control type (#15530)
    
    * [BEAM-11097] Add SideInputCache to harness control type
    * [BEAM-11097] Add comment indication cache is in-progress
---
 sdks/go/pkg/beam/core/runtime/harness/harness.go | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 5c5b7b2..a46f2c3 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
@@ -35,6 +36,9 @@ import (
        "google.golang.org/grpc"
 )
 
+// This side input cache size is a placeholder value.
+const cacheSize = 20
+
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
@@ -92,6 +96,9 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                log.Debugf(ctx, "control response channel closed")
        }()
 
+       sideCache := statecache.SideInputCache{}
+       sideCache.Init(cacheSize)
+
        ctrl := &control{
                lookupDesc:  lookupDesc,
                descriptors: 
make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor),
@@ -102,6 +109,7 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                failed:      make(map[instructionID]error),
                data:        &DataChannelManager{},
                state:       &StateChannelManager{},
+               cache:       &sideCache,
        }
 
        // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
@@ -231,6 +239,8 @@ type control struct {
 
        data  *DataChannelManager
        state *StateChannelManager
+       // TODO(BEAM-11097): Cache is currently unused.
+       cache *statecache.SideInputCache
 }
 
 func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) 
{

Reply via email to