This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a571952  [BEAM-12438] Add Regression test for issue around LP coding 
Row coders. (#14924)
a571952 is described below

commit a571952e3ce470b3871ebd333eacb3d6368d2737
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Mon Jun 7 09:45:26 2021 -0700

    [BEAM-12438] Add Regression test for issue around LP coding Row coders. 
(#14924)
    
    * [BEAM-12438] Run regression during integration
    * [BEAM-12438] Add LP error repro
    * [BEAM-12438] Ignore extra LP on injects.
    * [BEAM-12438] Populate schema option types.
    
    Co-authored-by: zelliott
---
 model/pipeline/src/main/proto/schema.proto         |  6 +--
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  9 +++-
 .../pkg/beam/core/runtime/graphx/schema/schema.go  | 56 ++++++++++++++-----
 .../beam/core/runtime/graphx/schema/schema_test.go | 16 ++----
 sdks/go/test/regression/lperror.go                 | 63 ++++++++++++++++++++++
 .../regression/{pardo_test.go => lperror_test.go}  | 46 +++++-----------
 sdks/go/test/regression/pardo_test.go              | 35 ++++++------
 .../{pardo_test.go => regression_test.go}          | 38 ++-----------
 sdks/go/test/run_validatesrunner_tests.sh          |  4 +-
 9 files changed, 158 insertions(+), 115 deletions(-)

diff --git a/model/pipeline/src/main/proto/schema.proto 
b/model/pipeline/src/main/proto/schema.proto
index 837689f..bcab2e7 100644
--- a/model/pipeline/src/main/proto/schema.proto
+++ b/model/pipeline/src/main/proto/schema.proto
@@ -113,9 +113,9 @@ message LogicalType {
 message Option {
   // REQUIRED. Identifier for the option.
   string name = 1;
-  // OPTIONAL. Type specifer for the structure of value.
-  // If not present, assumes no additional configuration is needed
-  // for this option and value is ignored.
+  // REQUIRED. Type specifer for the structure of value.
+  // Conventionally, options that don't require additional configuration should
+  // use a boolean type, with the value set to true.
   FieldType type = 2;
   FieldValue value = 3;
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index fbbdab3..105eb82 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -472,7 +472,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                        if !coder.IsKV(c) {
                                return nil, errors.Errorf("unexpected inject 
coder: %v", c)
                        }
-                       u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), 
ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}
+                       valCoder := c.Components[1]
+                       // JIRA BEAM-12438 - an extra LP coder can get added 
here, but isn't added
+                       // on decode. Strip them until we get a better fix.
+                       if valCoder.Kind == coder.LP {
+                               // strip unexpected length prefix coder.
+                               valCoder = valCoder.Components[0]
+                       }
+                       u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), 
ValueEncoder: MakeElementEncoder(valCoder), Out: out[0]}
 
                case graphx.URNExpand:
                        var pid string
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go 
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 087d8c1..2e3ea3f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -356,9 +356,7 @@ func (r *Registry) fromType(ot reflect.Type) 
(*pipepb.Schema, error) {
                schm := ftype.GetRowType().GetSchema()
                schm = proto.Clone(schm).(*pipepb.Schema)
                if ot.Kind() == reflect.Ptr {
-                       schm.Options = append(schm.Options, &pipepb.Option{
-                               Name: optGoNillable,
-                       })
+                       schm.Options = append(schm.Options, optGoNillable())
                }
                if lID != "" {
                        schm.Options = append(schm.Options, logicalOption(lID))
@@ -379,9 +377,7 @@ func (r *Registry) fromType(ot reflect.Type) 
(*pipepb.Schema, error) {
        pt := reflect.PtrTo(t)
        schm = proto.Clone(schm).(*pipepb.Schema)
        schm.Id = getUUID(pt)
-       schm.Options = append(schm.Options, &pipepb.Option{
-               Name: optGoNillable,
-       })
+       schm.Options = append(schm.Options, optGoNillable())
        r.idToType[schm.GetId()] = pt
        r.typeToSchema[pt] = schm
 
@@ -392,14 +388,46 @@ func (r *Registry) fromType(ot reflect.Type) 
(*pipepb.Schema, error) {
 // Schema Option urns.
 const (
        // optGoNillable indicates that this top level schema should be 
returned as a pointer type.
-       optGoNillable = "beam:schema:go:nillable:v1"
+       optGoNillableUrn = "beam:schema:go:nillable:v1"
        // optGoEmbedded indicates that this field is an embedded type.
-       optGoEmbedded = "beam:schema:go:embedded_field:v1"
+       optGoEmbeddedUrn = "beam:schema:go:embedded_field:v1"
        // optGoLogical indicates that this top level schema has a logical type 
equivalent that need to be looked up.
        // It has a value type of String representing the URN for the logical 
type to look up.
-       optGoLogical = "beam:schema:go:logical:v1"
+       optGoLogicalUrn = "beam:schema:go:logical:v1"
 )
 
+func optGoNillable() *pipepb.Option {
+       return newToggleOption(optGoNillableUrn)
+}
+
+func optGoEmbedded() *pipepb.Option {
+       return newToggleOption(optGoEmbeddedUrn)
+}
+
+// newToggleOption constructs an Option whose presence is all
+// that matters, rather than other configuration. The option
+// is not set if the toggle isn't true, so the value is always
+// true.
+func newToggleOption(urn string) *pipepb.Option {
+       return &pipepb.Option{
+               Name: urn,
+               Type: &pipepb.FieldType{
+                       TypeInfo: &pipepb.FieldType_AtomicType{
+                               AtomicType: pipepb.AtomicType_BOOLEAN,
+                       },
+               },
+               Value: &pipepb.FieldValue{
+                       FieldValue: &pipepb.FieldValue_AtomicValue{
+                               AtomicValue: &pipepb.AtomicTypeValue{
+                                       Value: &pipepb.AtomicTypeValue_Boolean{
+                                               Boolean: true,
+                                       },
+                               },
+                       },
+               },
+       }
+}
+
 func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option {
        for _, opt := range opts {
                if opt.GetName() == urn {
@@ -412,7 +440,7 @@ func checkOptions(opts []*pipepb.Option, urn string) 
*pipepb.Option {
 // nillableFromOptions converts the passed in type to it's pointer version
 // if the option is present. This permits go types to be pointers.
 func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type {
-       if checkOptions(opts, optGoNillable) != nil {
+       if checkOptions(opts, optGoNillableUrn) != nil {
                return reflect.PtrTo(t)
        }
        return nil
@@ -426,7 +454,7 @@ var optGoLogicalType = &pipepb.FieldType{
 
 func logicalOption(lID string) *pipepb.Option {
        return &pipepb.Option{
-               Name: optGoLogical,
+               Name: optGoLogicalUrn,
                Type: optGoLogicalType,
                Value: &pipepb.FieldValue{
                        FieldValue: &pipepb.FieldValue_AtomicValue{
@@ -443,7 +471,7 @@ func logicalOption(lID string) *pipepb.Option {
 // fromLogicalOption returns the logical type id of this top
 // level type if this schema has a logical equivalent.
 func fromLogicalOption(opts []*pipepb.Option) (string, bool) {
-       o := checkOptions(opts, optGoLogical)
+       o := checkOptions(opts, optGoLogicalUrn)
        if o == nil {
                return "", false
        }
@@ -489,7 +517,7 @@ func (r *Registry) structToSchema(t reflect.Type) 
(*pipepb.Schema, error) {
                }
                if isAnon {
                        f = proto.Clone(f).(*pipepb.Field)
-                       f.Options = append(f.Options, &pipepb.Option{Name: 
optGoEmbedded})
+                       f.Options = append(f.Options, optGoEmbedded())
                }
                fields = append(fields, f)
        }
@@ -663,7 +691,7 @@ func (r *Registry) toType(s *pipepb.Schema) (reflect.Type, 
error) {
                if err != nil {
                        return nil, errors.Wrapf(err, "cannot convert schema 
field %v to field", sf.GetName())
                }
-               if checkOptions(sf.Options, optGoEmbedded) != nil {
+               if checkOptions(sf.Options, optGoEmbeddedUrn) != nil {
                        rf.Anonymous = true
                }
                fields = append(fields, rf)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index b298d43..9fe2132 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -404,9 +404,7 @@ func TestSchemaConversion(t *testing.T) {
                                                },
                                        },
                                },
-                               Options: []*pipepb.Option{{
-                                       Name: optGoNillable,
-                               }},
+                               Options: []*pipepb.Option{optGoNillable()},
                        },
                        rt: reflect.TypeOf(&struct {
                                SuperNES int16
@@ -530,9 +528,7 @@ func TestSchemaConversion(t *testing.T) {
                                                },
                                        },
                                },
-                               Options: []*pipepb.Option{{
-                                       Name: optGoNillable,
-                               }, logicalOption("*schema.exportedFunc")},
+                               Options: []*pipepb.Option{optGoNillable(), 
logicalOption("*schema.exportedFunc")},
                        },
                        rt: exportedFuncType,
                }, {
@@ -568,7 +564,7 @@ func TestSchemaConversion(t *testing.T) {
                                Fields: []*pipepb.Field{
                                        {
                                                Name:    "Exported",
-                                               Options: 
[]*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}},
+                                               Options: 
[]*pipepb.Option{optGoEmbedded()},
                                                Type: &pipepb.FieldType{
                                                        TypeInfo: 
&pipepb.FieldType_RowType{
                                                                RowType: 
&pipepb.RowType{
@@ -610,7 +606,7 @@ func TestSchemaConversion(t *testing.T) {
                                Fields: []*pipepb.Field{
                                        {
                                                Name:    "Exported",
-                                               Options: 
[]*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}},
+                                               Options: 
[]*pipepb.Option{optGoEmbedded()},
                                                Type: &pipepb.FieldType{
                                                        Nullable: true,
                                                        TypeInfo: 
&pipepb.FieldType_RowType{
@@ -660,9 +656,7 @@ func TestSchemaConversion(t *testing.T) {
                                                },
                                        },
                                },
-                               Options: []*pipepb.Option{{
-                                       Name: optGoNillable,
-                               }},
+                               Options: []*pipepb.Option{optGoNillable()},
                        },
                        rt: reflect.TypeOf(&struct {
                                myInt
diff --git a/sdks/go/test/regression/lperror.go 
b/sdks/go/test/regression/lperror.go
new file mode 100644
index 0000000..4e27f97
--- /dev/null
+++ b/sdks/go/test/regression/lperror.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package regression
+
+import (
+       "github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+// REPRO found by https://github.com/zelliott
+
+type fruit struct {
+       Name string
+}
+
+func toFoo(id int, _ func(**fruit) bool) (int, string) {
+       return id, "Foo"
+}
+
+func toID(id int, fruitIter func(**fruit) bool, _ func(*string) bool) int {
+       var fruit *fruit
+       for fruitIter(&fruit) {
+       }
+       return id
+}
+
+// LPErrorPipeline constructs a pipeline that has a GBK followed by a CoGBK 
using the same
+// input, with schema encoded structs as elements. This ends up having the 
stage after the
+// CoGBK fail since the decoder post-cogbk is missing a Length Prefix coder 
that was
+// applied to the GBK input, but not the CoGBK output.
+// Root is likely in that there's no Beam standard CoGBK format for inject and 
expand.
+// JIRA: BEAM-12438
+func LPErrorPipeline(s beam.Scope) beam.PCollection {
+       // ["Apple", "Banana", "Cherry"]
+       fruits := beam.CreateList(s, []*fruit{{"Apple"}, {"Banana"}, 
{"Cherry"}})
+
+       // [0 "Apple", 0 "Banana", 0 "Cherry"]
+       fruitsKV := beam.AddFixedKey(s, fruits)
+
+       // [0 ["Apple", "Banana", "Cherry"]]
+       fruitsGBK := beam.GroupByKey(s, fruitsKV)
+
+       // [0 "Foo"]
+       fooKV := beam.ParDo(s, toFoo, fruitsGBK)
+
+       // [0 ["Foo"] ["Apple", "Banana", "Cherry"]]
+       fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
+
+       // [0]
+       return beam.ParDo(s, toID, fruitsFooCoGBK)
+}
diff --git a/sdks/go/test/regression/pardo_test.go 
b/sdks/go/test/regression/lperror_test.go
similarity index 56%
copy from sdks/go/test/regression/pardo_test.go
copy to sdks/go/test/regression/lperror_test.go
index 322dd69..773570d 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/lperror_test.go
@@ -18,41 +18,23 @@ package regression
 import (
        "testing"
 
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
-)
-
-func TestDirectParDo(t *testing.T) {
-       if err := ptest.Run(DirectParDo()); err != nil {
-               t.Error(err)
-       }
-}
-
-func TestEmitParDo(t *testing.T) {
-       if err := ptest.Run(EmitParDo()); err != nil {
-               t.Error(err)
-       }
-}
+       "github.com/apache/beam/sdks/go/test/integration"
 
-func TestMultiEmitParDo(t *testing.T) {
-       if err := ptest.Run(MultiEmitParDo()); err != nil {
-               t.Error(err)
-       }
-}
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
+)
 
-func TestMixedOutputParDo(t *testing.T) {
-       if err := ptest.Run(MixedOutputParDo()); err != nil {
-               t.Error(err)
-       }
-}
+func TestLPErrorPipeline(t *testing.T) {
+       integration.CheckFilters(t)
 
-func TestDirectParDoAfterGBK(t *testing.T) {
-       if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
-               t.Error(err)
-       }
-}
+       pipeline, s := beam.NewPipelineWithRoot()
+       want := beam.CreateList(s, []int{0})
+       got := LPErrorPipeline(s)
+       passert.Equals(s, got, want)
 
-func TestEmitParDoAfterGBK(t *testing.T) {
-       if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
-               t.Error(err)
-       }
+       ptest.RunAndValidate(t, pipeline)
 }
diff --git a/sdks/go/test/regression/pardo_test.go 
b/sdks/go/test/regression/pardo_test.go
index 322dd69..cbcb81c 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/pardo_test.go
@@ -19,40 +19,39 @@ import (
        "testing"
 
        "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/go/test/integration"
+
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
 )
 
 func TestDirectParDo(t *testing.T) {
-       if err := ptest.Run(DirectParDo()); err != nil {
-               t.Error(err)
-       }
+       integration.CheckFilters(t)
+       ptest.RunAndValidate(t, DirectParDo())
 }
 
 func TestEmitParDo(t *testing.T) {
-       if err := ptest.Run(EmitParDo()); err != nil {
-               t.Error(err)
-       }
+       integration.CheckFilters(t)
+       ptest.RunAndValidate(t, EmitParDo())
 }
 
 func TestMultiEmitParDo(t *testing.T) {
-       if err := ptest.Run(MultiEmitParDo()); err != nil {
-               t.Error(err)
-       }
+       integration.CheckFilters(t)
+       ptest.RunAndValidate(t, MultiEmitParDo())
 }
 
 func TestMixedOutputParDo(t *testing.T) {
-       if err := ptest.Run(MixedOutputParDo()); err != nil {
-               t.Error(err)
-       }
+       integration.CheckFilters(t)
+       ptest.RunAndValidate(t, MixedOutputParDo())
 }
 
 func TestDirectParDoAfterGBK(t *testing.T) {
-       if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
-               t.Error(err)
-       }
+       integration.CheckFilters(t)
+       ptest.RunAndValidate(t, DirectParDoAfterGBK())
 }
 
 func TestEmitParDoAfterGBK(t *testing.T) {
-       if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
-               t.Error(err)
-       }
+       integration.CheckFilters(t)
+       ptest.RunAndValidate(t, EmitParDoAfterGBK())
 }
diff --git a/sdks/go/test/regression/pardo_test.go 
b/sdks/go/test/regression/regression_test.go
similarity index 56%
copy from sdks/go/test/regression/pardo_test.go
copy to sdks/go/test/regression/regression_test.go
index 322dd69..132e9a8 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/regression_test.go
@@ -21,38 +21,8 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
 )
 
-func TestDirectParDo(t *testing.T) {
-       if err := ptest.Run(DirectParDo()); err != nil {
-               t.Error(err)
-       }
-}
-
-func TestEmitParDo(t *testing.T) {
-       if err := ptest.Run(EmitParDo()); err != nil {
-               t.Error(err)
-       }
-}
-
-func TestMultiEmitParDo(t *testing.T) {
-       if err := ptest.Run(MultiEmitParDo()); err != nil {
-               t.Error(err)
-       }
-}
-
-func TestMixedOutputParDo(t *testing.T) {
-       if err := ptest.Run(MixedOutputParDo()); err != nil {
-               t.Error(err)
-       }
-}
-
-func TestDirectParDoAfterGBK(t *testing.T) {
-       if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
-               t.Error(err)
-       }
-}
-
-func TestEmitParDoAfterGBK(t *testing.T) {
-       if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
-               t.Error(err)
-       }
+// TestMain invokes ptest.Main to allow running these tests on
+// non-direct runners.
+func TestMain(m *testing.M) {
+       ptest.Main(m)
 }
diff --git a/sdks/go/test/run_validatesrunner_tests.sh 
b/sdks/go/test/run_validatesrunner_tests.sh
index a43898e..00ec453 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -310,11 +310,11 @@ if [[ "$JENKINS" == true ]]; then
   cd ./src
 
   echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS"
-  GOPATH=$TEMP_GOPATH go test -v 
github.com/apache/beam/sdks/go/test/integration/... $ARGS \
+  GOPATH=$TEMP_GOPATH go test -v 
github.com/apache/beam/sdks/go/test/integration/... 
github.com/apache/beam/sdks/go/test/regression $ARGS \
       || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before 
exiting
 else
   echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS"
-  go test -v ./sdks/go/test/integration/... $ARGS \
+  go test -v ./sdks/go/test/integration/... ./sdks/go/test/regression  $ARGS \
       || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before 
exiting
 fi
 

Reply via email to