This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a571952 [BEAM-12438] Add Regression test for issue around LP coding Row coders. (#14924) a571952 is described below commit a571952e3ce470b3871ebd333eacb3d6368d2737 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Mon Jun 7 09:45:26 2021 -0700 [BEAM-12438] Add Regression test for issue around LP coding Row coders. (#14924) * [BEAM-12438] Run regression during integration * [BEAM-12438] Add LP error repro * [BEAM-12438] Ignore extra LP on injects. * [BEAM-12438] Populate schema option types. Co-authored-by: zelliott --- model/pipeline/src/main/proto/schema.proto | 6 +-- sdks/go/pkg/beam/core/runtime/exec/translate.go | 9 +++- .../pkg/beam/core/runtime/graphx/schema/schema.go | 56 ++++++++++++++----- .../beam/core/runtime/graphx/schema/schema_test.go | 16 ++---- sdks/go/test/regression/lperror.go | 63 ++++++++++++++++++++++ .../regression/{pardo_test.go => lperror_test.go} | 46 +++++----------- sdks/go/test/regression/pardo_test.go | 35 ++++++------ .../{pardo_test.go => regression_test.go} | 38 ++----------- sdks/go/test/run_validatesrunner_tests.sh | 4 +- 9 files changed, 158 insertions(+), 115 deletions(-) diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto index 837689f..bcab2e7 100644 --- a/model/pipeline/src/main/proto/schema.proto +++ b/model/pipeline/src/main/proto/schema.proto @@ -113,9 +113,9 @@ message LogicalType { message Option { // REQUIRED. Identifier for the option. string name = 1; - // OPTIONAL. Type specifer for the structure of value. - // If not present, assumes no additional configuration is needed - // for this option and value is ignored. + // REQUIRED. Type specifer for the structure of value. + // Conventionally, options that don't require additional configuration should + // use a boolean type, with the value set to true. FieldType type = 2; FieldValue value = 3; } diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index fbbdab3..105eb82 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -472,7 +472,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { if !coder.IsKV(c) { return nil, errors.Errorf("unexpected inject coder: %v", c) } - u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]} + valCoder := c.Components[1] + // JIRA BEAM-12438 - an extra LP coder can get added here, but isn't added + // on decode. Strip them until we get a better fix. + if valCoder.Kind == coder.LP { + // strip unexpected length prefix coder. + valCoder = valCoder.Components[0] + } + u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(valCoder), Out: out[0]} case graphx.URNExpand: var pid string diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go index 087d8c1..2e3ea3f 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go @@ -356,9 +356,7 @@ func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) { schm := ftype.GetRowType().GetSchema() schm = proto.Clone(schm).(*pipepb.Schema) if ot.Kind() == reflect.Ptr { - schm.Options = append(schm.Options, &pipepb.Option{ - Name: optGoNillable, - }) + schm.Options = append(schm.Options, optGoNillable()) } if lID != "" { schm.Options = append(schm.Options, logicalOption(lID)) @@ -379,9 +377,7 @@ func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) { pt := reflect.PtrTo(t) schm = proto.Clone(schm).(*pipepb.Schema) schm.Id = getUUID(pt) - schm.Options = append(schm.Options, &pipepb.Option{ - Name: optGoNillable, - }) + schm.Options = append(schm.Options, optGoNillable()) r.idToType[schm.GetId()] = pt r.typeToSchema[pt] = schm @@ -392,14 +388,46 @@ func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) { // Schema Option urns. const ( // optGoNillable indicates that this top level schema should be returned as a pointer type. - optGoNillable = "beam:schema:go:nillable:v1" + optGoNillableUrn = "beam:schema:go:nillable:v1" // optGoEmbedded indicates that this field is an embedded type. - optGoEmbedded = "beam:schema:go:embedded_field:v1" + optGoEmbeddedUrn = "beam:schema:go:embedded_field:v1" // optGoLogical indicates that this top level schema has a logical type equivalent that need to be looked up. // It has a value type of String representing the URN for the logical type to look up. - optGoLogical = "beam:schema:go:logical:v1" + optGoLogicalUrn = "beam:schema:go:logical:v1" ) +func optGoNillable() *pipepb.Option { + return newToggleOption(optGoNillableUrn) +} + +func optGoEmbedded() *pipepb.Option { + return newToggleOption(optGoEmbeddedUrn) +} + +// newToggleOption constructs an Option whose presence is all +// that matters, rather than other configuration. The option +// is not set if the toggle isn't true, so the value is always +// true. +func newToggleOption(urn string) *pipepb.Option { + return &pipepb.Option{ + Name: urn, + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_BOOLEAN, + }, + }, + Value: &pipepb.FieldValue{ + FieldValue: &pipepb.FieldValue_AtomicValue{ + AtomicValue: &pipepb.AtomicTypeValue{ + Value: &pipepb.AtomicTypeValue_Boolean{ + Boolean: true, + }, + }, + }, + }, + } +} + func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option { for _, opt := range opts { if opt.GetName() == urn { @@ -412,7 +440,7 @@ func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option { // nillableFromOptions converts the passed in type to it's pointer version // if the option is present. This permits go types to be pointers. func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type { - if checkOptions(opts, optGoNillable) != nil { + if checkOptions(opts, optGoNillableUrn) != nil { return reflect.PtrTo(t) } return nil @@ -426,7 +454,7 @@ var optGoLogicalType = &pipepb.FieldType{ func logicalOption(lID string) *pipepb.Option { return &pipepb.Option{ - Name: optGoLogical, + Name: optGoLogicalUrn, Type: optGoLogicalType, Value: &pipepb.FieldValue{ FieldValue: &pipepb.FieldValue_AtomicValue{ @@ -443,7 +471,7 @@ func logicalOption(lID string) *pipepb.Option { // fromLogicalOption returns the logical type id of this top // level type if this schema has a logical equivalent. func fromLogicalOption(opts []*pipepb.Option) (string, bool) { - o := checkOptions(opts, optGoLogical) + o := checkOptions(opts, optGoLogicalUrn) if o == nil { return "", false } @@ -489,7 +517,7 @@ func (r *Registry) structToSchema(t reflect.Type) (*pipepb.Schema, error) { } if isAnon { f = proto.Clone(f).(*pipepb.Field) - f.Options = append(f.Options, &pipepb.Option{Name: optGoEmbedded}) + f.Options = append(f.Options, optGoEmbedded()) } fields = append(fields, f) } @@ -663,7 +691,7 @@ func (r *Registry) toType(s *pipepb.Schema) (reflect.Type, error) { if err != nil { return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName()) } - if checkOptions(sf.Options, optGoEmbedded) != nil { + if checkOptions(sf.Options, optGoEmbeddedUrn) != nil { rf.Anonymous = true } fields = append(fields, rf) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go index b298d43..9fe2132 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go @@ -404,9 +404,7 @@ func TestSchemaConversion(t *testing.T) { }, }, }, - Options: []*pipepb.Option{{ - Name: optGoNillable, - }}, + Options: []*pipepb.Option{optGoNillable()}, }, rt: reflect.TypeOf(&struct { SuperNES int16 @@ -530,9 +528,7 @@ func TestSchemaConversion(t *testing.T) { }, }, }, - Options: []*pipepb.Option{{ - Name: optGoNillable, - }, logicalOption("*schema.exportedFunc")}, + Options: []*pipepb.Option{optGoNillable(), logicalOption("*schema.exportedFunc")}, }, rt: exportedFuncType, }, { @@ -568,7 +564,7 @@ func TestSchemaConversion(t *testing.T) { Fields: []*pipepb.Field{ { Name: "Exported", - Options: []*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}}, + Options: []*pipepb.Option{optGoEmbedded()}, Type: &pipepb.FieldType{ TypeInfo: &pipepb.FieldType_RowType{ RowType: &pipepb.RowType{ @@ -610,7 +606,7 @@ func TestSchemaConversion(t *testing.T) { Fields: []*pipepb.Field{ { Name: "Exported", - Options: []*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}}, + Options: []*pipepb.Option{optGoEmbedded()}, Type: &pipepb.FieldType{ Nullable: true, TypeInfo: &pipepb.FieldType_RowType{ @@ -660,9 +656,7 @@ func TestSchemaConversion(t *testing.T) { }, }, }, - Options: []*pipepb.Option{{ - Name: optGoNillable, - }}, + Options: []*pipepb.Option{optGoNillable()}, }, rt: reflect.TypeOf(&struct { myInt diff --git a/sdks/go/test/regression/lperror.go b/sdks/go/test/regression/lperror.go new file mode 100644 index 0000000..4e27f97 --- /dev/null +++ b/sdks/go/test/regression/lperror.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package regression + +import ( + "github.com/apache/beam/sdks/go/pkg/beam" +) + +// REPRO found by https://github.com/zelliott + +type fruit struct { + Name string +} + +func toFoo(id int, _ func(**fruit) bool) (int, string) { + return id, "Foo" +} + +func toID(id int, fruitIter func(**fruit) bool, _ func(*string) bool) int { + var fruit *fruit + for fruitIter(&fruit) { + } + return id +} + +// LPErrorPipeline constructs a pipeline that has a GBK followed by a CoGBK using the same +// input, with schema encoded structs as elements. This ends up having the stage after the +// CoGBK fail since the decoder post-cogbk is missing a Length Prefix coder that was +// applied to the GBK input, but not the CoGBK output. +// Root is likely in that there's no Beam standard CoGBK format for inject and expand. +// JIRA: BEAM-12438 +func LPErrorPipeline(s beam.Scope) beam.PCollection { + // ["Apple", "Banana", "Cherry"] + fruits := beam.CreateList(s, []*fruit{{"Apple"}, {"Banana"}, {"Cherry"}}) + + // [0 "Apple", 0 "Banana", 0 "Cherry"] + fruitsKV := beam.AddFixedKey(s, fruits) + + // [0 ["Apple", "Banana", "Cherry"]] + fruitsGBK := beam.GroupByKey(s, fruitsKV) + + // [0 "Foo"] + fooKV := beam.ParDo(s, toFoo, fruitsGBK) + + // [0 ["Foo"] ["Apple", "Banana", "Cherry"]] + fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV) + + // [0] + return beam.ParDo(s, toID, fruitsFooCoGBK) +} diff --git a/sdks/go/test/regression/pardo_test.go b/sdks/go/test/regression/lperror_test.go similarity index 56% copy from sdks/go/test/regression/pardo_test.go copy to sdks/go/test/regression/lperror_test.go index 322dd69..773570d 100644 --- a/sdks/go/test/regression/pardo_test.go +++ b/sdks/go/test/regression/lperror_test.go @@ -18,41 +18,23 @@ package regression import ( "testing" + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest" -) - -func TestDirectParDo(t *testing.T) { - if err := ptest.Run(DirectParDo()); err != nil { - t.Error(err) - } -} - -func TestEmitParDo(t *testing.T) { - if err := ptest.Run(EmitParDo()); err != nil { - t.Error(err) - } -} + "github.com/apache/beam/sdks/go/test/integration" -func TestMultiEmitParDo(t *testing.T) { - if err := ptest.Run(MultiEmitParDo()); err != nil { - t.Error(err) - } -} + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark" +) -func TestMixedOutputParDo(t *testing.T) { - if err := ptest.Run(MixedOutputParDo()); err != nil { - t.Error(err) - } -} +func TestLPErrorPipeline(t *testing.T) { + integration.CheckFilters(t) -func TestDirectParDoAfterGBK(t *testing.T) { - if err := ptest.Run(DirectParDoAfterGBK()); err != nil { - t.Error(err) - } -} + pipeline, s := beam.NewPipelineWithRoot() + want := beam.CreateList(s, []int{0}) + got := LPErrorPipeline(s) + passert.Equals(s, got, want) -func TestEmitParDoAfterGBK(t *testing.T) { - if err := ptest.Run(EmitParDoAfterGBK()); err != nil { - t.Error(err) - } + ptest.RunAndValidate(t, pipeline) } diff --git a/sdks/go/test/regression/pardo_test.go b/sdks/go/test/regression/pardo_test.go index 322dd69..cbcb81c 100644 --- a/sdks/go/test/regression/pardo_test.go +++ b/sdks/go/test/regression/pardo_test.go @@ -19,40 +19,39 @@ import ( "testing" "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/go/test/integration" + + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark" ) func TestDirectParDo(t *testing.T) { - if err := ptest.Run(DirectParDo()); err != nil { - t.Error(err) - } + integration.CheckFilters(t) + ptest.RunAndValidate(t, DirectParDo()) } func TestEmitParDo(t *testing.T) { - if err := ptest.Run(EmitParDo()); err != nil { - t.Error(err) - } + integration.CheckFilters(t) + ptest.RunAndValidate(t, EmitParDo()) } func TestMultiEmitParDo(t *testing.T) { - if err := ptest.Run(MultiEmitParDo()); err != nil { - t.Error(err) - } + integration.CheckFilters(t) + ptest.RunAndValidate(t, MultiEmitParDo()) } func TestMixedOutputParDo(t *testing.T) { - if err := ptest.Run(MixedOutputParDo()); err != nil { - t.Error(err) - } + integration.CheckFilters(t) + ptest.RunAndValidate(t, MixedOutputParDo()) } func TestDirectParDoAfterGBK(t *testing.T) { - if err := ptest.Run(DirectParDoAfterGBK()); err != nil { - t.Error(err) - } + integration.CheckFilters(t) + ptest.RunAndValidate(t, DirectParDoAfterGBK()) } func TestEmitParDoAfterGBK(t *testing.T) { - if err := ptest.Run(EmitParDoAfterGBK()); err != nil { - t.Error(err) - } + integration.CheckFilters(t) + ptest.RunAndValidate(t, EmitParDoAfterGBK()) } diff --git a/sdks/go/test/regression/pardo_test.go b/sdks/go/test/regression/regression_test.go similarity index 56% copy from sdks/go/test/regression/pardo_test.go copy to sdks/go/test/regression/regression_test.go index 322dd69..132e9a8 100644 --- a/sdks/go/test/regression/pardo_test.go +++ b/sdks/go/test/regression/regression_test.go @@ -21,38 +21,8 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest" ) -func TestDirectParDo(t *testing.T) { - if err := ptest.Run(DirectParDo()); err != nil { - t.Error(err) - } -} - -func TestEmitParDo(t *testing.T) { - if err := ptest.Run(EmitParDo()); err != nil { - t.Error(err) - } -} - -func TestMultiEmitParDo(t *testing.T) { - if err := ptest.Run(MultiEmitParDo()); err != nil { - t.Error(err) - } -} - -func TestMixedOutputParDo(t *testing.T) { - if err := ptest.Run(MixedOutputParDo()); err != nil { - t.Error(err) - } -} - -func TestDirectParDoAfterGBK(t *testing.T) { - if err := ptest.Run(DirectParDoAfterGBK()); err != nil { - t.Error(err) - } -} - -func TestEmitParDoAfterGBK(t *testing.T) { - if err := ptest.Run(EmitParDoAfterGBK()); err != nil { - t.Error(err) - } +// TestMain invokes ptest.Main to allow running these tests on +// non-direct runners. +func TestMain(m *testing.M) { + ptest.Main(m) } diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh index a43898e..00ec453 100755 --- a/sdks/go/test/run_validatesrunner_tests.sh +++ b/sdks/go/test/run_validatesrunner_tests.sh @@ -310,11 +310,11 @@ if [[ "$JENKINS" == true ]]; then cd ./src echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS" - GOPATH=$TEMP_GOPATH go test -v github.com/apache/beam/sdks/go/test/integration/... $ARGS \ + GOPATH=$TEMP_GOPATH go test -v github.com/apache/beam/sdks/go/test/integration/... github.com/apache/beam/sdks/go/test/regression $ARGS \ || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before exiting else echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS" - go test -v ./sdks/go/test/integration/... $ARGS \ + go test -v ./sdks/go/test/integration/... ./sdks/go/test/regression $ARGS \ || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before exiting fi