This is an automated email from the ASF dual-hosted git repository. danoliveira pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5fa6c3e [BEAM-11097] Add SideInputCache to harness control type (#15530) 5fa6c3e is described below commit 5fa6c3ec1d3bb5af4cb4a25fa96ef795028a508b Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com> AuthorDate: Wed Sep 22 16:24:17 2021 -0400 [BEAM-11097] Add SideInputCache to harness control type (#15530) * [BEAM-11097] Add SideInputCache to harness control type * [BEAM-11097] Add comment indication cache is in-progress --- sdks/go/pkg/beam/core/runtime/harness/harness.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5c5b7b2..a46f2c3 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" @@ -35,6 +36,9 @@ import ( "google.golang.org/grpc" ) +// This side input cache size is a placeholder value. +const cacheSize = 20 + // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin). // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not @@ -92,6 +96,9 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { log.Debugf(ctx, "control response channel closed") }() + sideCache := statecache.SideInputCache{} + sideCache.Init(cacheSize) + ctrl := &control{ lookupDesc: lookupDesc, descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor), @@ -102,6 +109,7 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { failed: make(map[instructionID]error), data: &DataChannelManager{}, state: &StateChannelManager{}, + cache: &sideCache, } // gRPC requires all readers of a stream be the same goroutine, so this goroutine @@ -231,6 +239,8 @@ type control struct { data *DataChannelManager state *StateChannelManager + // TODO(BEAM-11097): Cache is currently unused. + cache *statecache.SideInputCache } func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {