This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e6ff61  [BEAM-9789] Fix lock error. Add test. (#11468)
6e6ff61 is described below

commit 6e6ff6157904498ceeca4931648168d793721a6e
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Tue Apr 21 13:52:03 2020 -0700

    [BEAM-9789] Fix lock error. Add test. (#11468)
---
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  13 +-
 .../pkg/beam/core/runtime/harness/harness_test.go  | 165 +++++++++++++++++++++
 2 files changed, 173 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 9d35a0e..26cd02e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -57,7 +57,9 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        client := fnpb.NewBeamFnControlClient(conn)
 
        lookupDesc := func(id bundleDescriptorID) 
(*fnpb.ProcessBundleDescriptor, error) {
-               return client.GetProcessBundleDescriptor(ctx, 
&fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+               pbd, err := client.GetProcessBundleDescriptor(ctx, 
&fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+               log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err)
+               return pbd, err
        }
 
        stub, err := client.Control(ctx)
@@ -170,18 +172,19 @@ func (c *control) getOrCreatePlan(bdID 
bundleDescriptorID) (*exec.Plan, error) {
        } else {
                desc, ok := c.descriptors[bdID]
                if !ok {
-                       c.mu.Unlock()
+                       c.mu.Unlock() // Unlock to make the lookup.
                        newDesc, err := c.lookupDesc(bdID)
-                       c.mu.Lock()
                        if err != nil {
-                               return nil, fmt.Errorf("execution plan for %v 
not found: %v", bdID, err)
+                               return nil, errors.Wrapf(err, "execution plan 
for %v not found", bdID)
                        }
+                       c.mu.Lock()
                        c.descriptors[bdID] = newDesc
                        desc = newDesc
                }
                newPlan, err := exec.UnmarshalPlan(desc)
                if err != nil {
-                       return nil, fmt.Errorf("Invalid bundle desc: %v", err)
+                       c.mu.Unlock()
+                       return nil, errors.Wrapf(err, "invalid bundle desc %v: 
%v", bdID, desc)
                }
                plan = newPlan
        }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
new file mode 100644
index 0000000..3afde8e
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+       "fmt"
+       "strings"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "github.com/golang/protobuf/proto"
+)
+
+// validDescriptor describes a valid pipeline with a source and a sink, but 
doesn't do anything else.
+func validDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor {
+       t.Helper()
+       port := &fnpb.RemoteGrpcPort{
+               CoderId: "c1",
+               ApiServiceDescriptor: &pipepb.ApiServiceDescriptor{
+                       Url: "hostname:port",
+               },
+       }
+       portBytes, err := proto.Marshal(port)
+       if err != nil {
+               t.Fatalf("bad port: %v", err)
+       }
+       return &fnpb.ProcessBundleDescriptor{
+               Id: "test",
+               Transforms: map[string]*pipepb.PTransform{
+                       "source": &pipepb.PTransform{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn:     "beam:runner:source:v1",
+                                       Payload: portBytes,
+                               },
+                               Outputs: map[string]string{
+                                       "o1": "p1",
+                               },
+                       },
+                       "sink": &pipepb.PTransform{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn:     "beam:runner:sink:v1",
+                                       Payload: portBytes,
+                               },
+                               Inputs: map[string]string{
+                                       "i1": "p1",
+                               },
+                       },
+               },
+               Pcollections: map[string]*pipepb.PCollection{
+                       "p1": &pipepb.PCollection{
+                               CoderId: "c1",
+                       },
+               },
+               Coders: map[string]*pipepb.Coder{
+                       "c1": &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: "beam:coder:windowed_value:v1",
+                               },
+                               ComponentCoderIds: []string{"c2", "c3"},
+                       },
+                       "c2": &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: "beam:coder:varint:v1",
+                               },
+                       },
+                       "c3": &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: "beam:coder:global_window:v1",
+                               },
+                       },
+               },
+       }
+
+}
+
+func invalidDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor {
+       return &fnpb.ProcessBundleDescriptor{}
+}
+
+func TestControl_getOrCreatePlan(t *testing.T) {
+       testBDID := bundleDescriptorID("test")
+       testPlan, err := exec.UnmarshalPlan(validDescriptor(t))
+       if err != nil {
+               t.Fatal("bad testPlan")
+       }
+       tests := []struct {
+               name               string
+               lookupErr, planErr error
+               lookupDesc         *fnpb.ProcessBundleDescriptor
+               descriptors        
map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor
+               plans              map[bundleDescriptorID][]*exec.Plan
+       }{
+               {
+                       name:       "OK",
+                       lookupDesc: validDescriptor(t),
+               }, {
+                       name: "cachedDescriptor",
+                       descriptors: 
map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor{
+                               testBDID: validDescriptor(t),
+                       },
+               }, {
+                       name: "cachedPlan",
+                       plans: map[bundleDescriptorID][]*exec.Plan{
+                               testBDID: []*exec.Plan{testPlan},
+                       },
+               }, {
+                       name:      "badLookup",
+                       lookupErr: fmt.Errorf("lookupError"),
+               }, {
+                       name:       "badDescriptorPlan",
+                       lookupDesc: invalidDescriptor(t),
+                       planErr:    fmt.Errorf("invalid bundle desc"),
+               },
+       }
+
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       ctrl := &control{
+                               lookupDesc: func(bdID bundleDescriptorID) 
(*fnpb.ProcessBundleDescriptor, error) {
+                                       return test.lookupDesc, test.lookupErr
+                               },
+                               descriptors: 
make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor),
+                               plans:       
make(map[bundleDescriptorID][]*exec.Plan),
+                               active:      make(map[instructionID]*exec.Plan),
+                               failed:      make(map[instructionID]error),
+                       }
+                       if test.descriptors != nil {
+                               ctrl.descriptors = test.descriptors
+                       }
+                       if test.plans != nil {
+                               ctrl.plans = test.plans
+                       }
+                       if test.planErr == nil {
+                               test.planErr = test.lookupErr
+                       }
+
+                       plan, err := ctrl.getOrCreatePlan(testBDID)
+                       if err != nil {
+                               if plan != nil {
+                                       t.Error("getOrCreatePlan returned a 
non-nil error and non-nil plan. Non-nil errors must have nil plans.")
+                               }
+                               if got, want := err.Error(), 
test.planErr.Error(); !strings.Contains(got, want) {
+                                       t.Errorf("getOrCreatePlan errored: got 
%q, want to contain %q", got, want)
+                               }
+                       }
+
+               })
+       }
+
+}

Reply via email to