This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6e6ff61 [BEAM-9789] Fix lock error. Add test. (#11468) 6e6ff61 is described below commit 6e6ff6157904498ceeca4931648168d793721a6e Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Tue Apr 21 13:52:03 2020 -0700 [BEAM-9789] Fix lock error. Add test. (#11468) --- sdks/go/pkg/beam/core/runtime/harness/harness.go | 13 +- .../pkg/beam/core/runtime/harness/harness_test.go | 165 +++++++++++++++++++++ 2 files changed, 173 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 9d35a0e..26cd02e 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -57,7 +57,9 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { client := fnpb.NewBeamFnControlClient(conn) lookupDesc := func(id bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) { - return client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)}) + pbd, err := client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)}) + log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err) + return pbd, err } stub, err := client.Control(ctx) @@ -170,18 +172,19 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) { } else { desc, ok := c.descriptors[bdID] if !ok { - c.mu.Unlock() + c.mu.Unlock() // Unlock to make the lookup. newDesc, err := c.lookupDesc(bdID) - c.mu.Lock() if err != nil { - return nil, fmt.Errorf("execution plan for %v not found: %v", bdID, err) + return nil, errors.Wrapf(err, "execution plan for %v not found", bdID) } + c.mu.Lock() c.descriptors[bdID] = newDesc desc = newDesc } newPlan, err := exec.UnmarshalPlan(desc) if err != nil { - return nil, fmt.Errorf("Invalid bundle desc: %v", err) + c.mu.Unlock() + return nil, errors.Wrapf(err, "invalid bundle desc %v: %v", bdID, desc) } plan = newPlan } diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go new file mode 100644 index 0000000..3afde8e --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package harness + +import ( + "fmt" + "strings" + "testing" + + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "github.com/golang/protobuf/proto" +) + +// validDescriptor describes a valid pipeline with a source and a sink, but doesn't do anything else. +func validDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor { + t.Helper() + port := &fnpb.RemoteGrpcPort{ + CoderId: "c1", + ApiServiceDescriptor: &pipepb.ApiServiceDescriptor{ + Url: "hostname:port", + }, + } + portBytes, err := proto.Marshal(port) + if err != nil { + t.Fatalf("bad port: %v", err) + } + return &fnpb.ProcessBundleDescriptor{ + Id: "test", + Transforms: map[string]*pipepb.PTransform{ + "source": &pipepb.PTransform{ + Spec: &pipepb.FunctionSpec{ + Urn: "beam:runner:source:v1", + Payload: portBytes, + }, + Outputs: map[string]string{ + "o1": "p1", + }, + }, + "sink": &pipepb.PTransform{ + Spec: &pipepb.FunctionSpec{ + Urn: "beam:runner:sink:v1", + Payload: portBytes, + }, + Inputs: map[string]string{ + "i1": "p1", + }, + }, + }, + Pcollections: map[string]*pipepb.PCollection{ + "p1": &pipepb.PCollection{ + CoderId: "c1", + }, + }, + Coders: map[string]*pipepb.Coder{ + "c1": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: "beam:coder:windowed_value:v1", + }, + ComponentCoderIds: []string{"c2", "c3"}, + }, + "c2": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: "beam:coder:varint:v1", + }, + }, + "c3": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: "beam:coder:global_window:v1", + }, + }, + }, + } + +} + +func invalidDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor { + return &fnpb.ProcessBundleDescriptor{} +} + +func TestControl_getOrCreatePlan(t *testing.T) { + testBDID := bundleDescriptorID("test") + testPlan, err := exec.UnmarshalPlan(validDescriptor(t)) + if err != nil { + t.Fatal("bad testPlan") + } + tests := []struct { + name string + lookupErr, planErr error + lookupDesc *fnpb.ProcessBundleDescriptor + descriptors map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor + plans map[bundleDescriptorID][]*exec.Plan + }{ + { + name: "OK", + lookupDesc: validDescriptor(t), + }, { + name: "cachedDescriptor", + descriptors: map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor{ + testBDID: validDescriptor(t), + }, + }, { + name: "cachedPlan", + plans: map[bundleDescriptorID][]*exec.Plan{ + testBDID: []*exec.Plan{testPlan}, + }, + }, { + name: "badLookup", + lookupErr: fmt.Errorf("lookupError"), + }, { + name: "badDescriptorPlan", + lookupDesc: invalidDescriptor(t), + planErr: fmt.Errorf("invalid bundle desc"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctrl := &control{ + lookupDesc: func(bdID bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) { + return test.lookupDesc, test.lookupErr + }, + descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor), + plans: make(map[bundleDescriptorID][]*exec.Plan), + active: make(map[instructionID]*exec.Plan), + failed: make(map[instructionID]error), + } + if test.descriptors != nil { + ctrl.descriptors = test.descriptors + } + if test.plans != nil { + ctrl.plans = test.plans + } + if test.planErr == nil { + test.planErr = test.lookupErr + } + + plan, err := ctrl.getOrCreatePlan(testBDID) + if err != nil { + if plan != nil { + t.Error("getOrCreatePlan returned a non-nil error and non-nil plan. Non-nil errors must have nil plans.") + } + if got, want := err.Error(), test.planErr.Error(); !strings.Contains(got, want) { + t.Errorf("getOrCreatePlan errored: got %q, want to contain %q", got, want) + } + } + + }) + } + +}