[
https://issues.apache.org/jira/browse/BEAM-9615?focusedWorklogId=527944&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-527944
]
ASF GitHub Bot logged work on BEAM-9615:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Dec/20 02:27
Start Date: 24/Dec/20 02:27
Worklog Time Spent: 10m
Work Description: youngoli commented on a change in pull request #13611:
URL: https://github.com/apache/beam/pull/13611#discussion_r548338119
##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// RowDecoderBuilder allows one to build Beam Schema row encoders for provided
types.
+type RowDecoderBuilder struct {
+ allFuncs map[reflect.Type]decoderProvider
+ ifaceFuncs []reflect.Type
+}
+
+type decoderProvider = func(reflect.Type) (func(io.Reader) (interface{},
error), error)
Review comment:
What's the purpose for `allFuncs` to contain `decoderProvider` instead
of `func(io.Reader) (interface{}, error)` directly? Is it to allow for
interface types? Like the `decoderProvider` for an interface type would return
different decoders by being passed a concrete type?
##########
File path: sdks/go/pkg/beam/core/graph/coder/row_encoder.go
##########
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+type encoderProvider = func(reflect.Type) (func(interface{}, io.Writer) error,
error)
+
+// RowEncoderBuilder allows one to build Beam Schema row encoders for provided
types.
+type RowEncoderBuilder struct {
+ allFuncs map[reflect.Type]encoderProvider
+ ifaceFuncs []reflect.Type
+}
+
+// Register accepts a provider for the given type to schema encode values of
that type.
+//
+// When generating encoding functions, this builder will first check for exact
type
+// matches, then against interfaces with registered factories in recency order
of
+// registration, and then finally use the default Beam Schema encoding
behavior.
+//
+// TODO(BEAM-9615): Add final factory types. This interface is subject to
change.
+// Currently f must be a function of the type func(reflect.Type) func(T,
io.Writer) (error).
+func (b *RowEncoderBuilder) Register(rt reflect.Type, f interface{}) {
+ fe, ok := f.(encoderProvider)
+ if !ok {
+ panic(fmt.Sprintf("%v isn't a supported decoder function type
(passed with %T)", f, rt))
Review comment:
```suggestion
panic(fmt.Sprintf("%v isn't a supported encoder function type
(passed with %T)", f, rt))
```
##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil_test.go
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutil
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+type UserInterface interface {
+ mark()
+}
+
+type UserType1 struct {
+ A string
+ B int
+ C string
+}
+
+func (UserType1) mark() {}
+
+func ut1EncDropB(val interface{}, w io.Writer) error {
+ if err := coder.WriteSimpleRowHeader(2, w); err != nil {
+ return err
+ }
+ elm := val.(UserType1)
+ if err := coder.EncodeStringUTF8(elm.A, w); err != nil {
+ return err
+ }
+ if err := coder.EncodeStringUTF8(elm.C, w); err != nil {
+ return err
+ }
+ return nil
+}
+
+func ut1DecDropB(r io.Reader) (interface{}, error) {
+ if err := coder.ReadSimpleRowHeader(2, r); err != nil {
+ return nil, err
+ }
+ a, err := coder.DecodeStringUTF8(r)
+ if err != nil {
+ return nil, fmt.Errorf("decoding string field A: %w", err)
+ }
+ c, err := coder.DecodeStringUTF8(r)
+ if err != nil {
+ return nil, fmt.Errorf("decoding string field C: %v, %w", c,
err)
+ }
+ return UserType1{
+ A: a,
+ B: 42,
+ C: c,
+ }, nil
+}
+
+// TestValidateCoder_SingleValue checks that the validate coder fun will
+func TestValidateCoder(t *testing.T) {
+ // Validates a custom UserType1 encoding, which drops encoding the "B"
field,
+ // always setting it to a constant value.
+ t.Run("SingleValue", func(t *testing.T) {
+ (&SchemaCoder{}).Validate(t,
reflect.TypeOf((*UserType1)(nil)).Elem(),
+ func(reflect.Type) (func(interface{}, io.Writer) error,
error) { return ut1EncDropB, nil },
+ func(reflect.Type) (func(io.Reader) (interface{},
error), error) { return ut1DecDropB, nil },
+ struct{ A, C string }{},
+ UserType1{
+ A: "cats",
+ B: 42,
+ C: "pjamas",
+ },
+ )
+ })
+ t.Run("SliceOfValues", func(t *testing.T) {
+ (&SchemaCoder{}).Validate(t,
reflect.TypeOf((*UserType1)(nil)).Elem(),
+ func(reflect.Type) (func(interface{}, io.Writer) error,
error) { return ut1EncDropB, nil },
+ func(reflect.Type) (func(io.Reader) (interface{},
error), error) { return ut1DecDropB, nil },
+ struct{ A, C string }{},
+ []UserType1{
+ {
+ A: "cats",
+ B: 42,
+ C: "pjamas",
+ }, {
+ A: "dogs",
+ B: 42,
+ C: "breakfast",
+ }, {
+ A: "fish",
+ B: 42,
+ C: "plenty of",
+ },
+ },
+ )
+ })
+ t.Run("InterfaceCoder", func(t *testing.T) {
+ (&SchemaCoder{}).Validate(t,
reflect.TypeOf((*UserInterface)(nil)).Elem(),
+ func(rt reflect.Type) (func(interface{}, io.Writer)
error, error) {
+ return ut1EncDropB, nil
+ },
+ func(rt reflect.Type) (func(io.Reader) (interface{},
error), error) {
+ return ut1DecDropB, nil
+ },
+ struct{ A, C string }{},
+ UserType1{
+ A: "cats",
+ B: 42,
+ C: "pjamas",
+ },
+ )
+ })
+}
Review comment:
Suggestion: You could probably add a test for the failure case though
you'll need to do some refactoring on the Validate function. Basically just
have an inner function that's the current implementation but doesn't use
`*testing.T` and instead returns an error if something went wrong and make the
current exported `Validate` function call that and do `t.Fatalf` if an error
appears. With that refactor, you can add a test that calls the inner function
with an element that doesn't have 42 for `B`, so you can validate that it fails
when expected.
##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil.go
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package testutil contains helpers to test and validate custom Beam Schema
coders.
+package testutil
+
+import (
+ "bytes"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ "github.com/google/go-cmp/cmp"
+)
+
+// SchemaCoder helps validate custom schema coders.
+type SchemaCoder struct {
+ encBldUT, encBldSchema coder.RowEncoderBuilder
+ decBldUT, decBldSchema coder.RowDecoderBuilder
+
+ // CmpOptions to pass into the round trip comparison
+ CmpOptions cmp.Options
+}
+
+// Register adds additional custom types not under test to both the under test
+// and default schema coders.
+func (v *SchemaCoder) Register(rt reflect.Type, encF, decF interface{}) {
+ v.encBldUT.Register(rt, encF)
+ v.encBldSchema.Register(rt, encF)
+ v.decBldUT.Register(rt, decF)
+ v.decBldSchema.Register(rt, decF)
+}
+
+// Validate is a test utility to validate custom schema coders generate
+// beam schema encoded bytes.
+//
+// Validate accepts the reflect.Type to register, factory functions for
encoding and decoding, an
+// anonymous struct type equivalent to the encoded format produced and
consumed by the factory produced functions
+// and test values. Values must be a single struct or pointer to struct.
Review comment:
This line reads like the parameter `values` shouldn't be a slice, but if
I'm reading the code correctly it's supposed to mean that each element in
`values` must be a single struct or pointer to struct. I recommend rephrasing
it along these lines:
> Values must be either a struct, pointer to struct, or a slice where each
element is a struct or pointer to struct.
##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil_test.go
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutil
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+type UserInterface interface {
+ mark()
+}
+
+type UserType1 struct {
+ A string
+ B int
+ C string
+}
+
+func (UserType1) mark() {}
+
+func ut1EncDropB(val interface{}, w io.Writer) error {
+ if err := coder.WriteSimpleRowHeader(2, w); err != nil {
+ return err
+ }
+ elm := val.(UserType1)
+ if err := coder.EncodeStringUTF8(elm.A, w); err != nil {
+ return err
+ }
+ if err := coder.EncodeStringUTF8(elm.C, w); err != nil {
+ return err
+ }
+ return nil
+}
+
+func ut1DecDropB(r io.Reader) (interface{}, error) {
+ if err := coder.ReadSimpleRowHeader(2, r); err != nil {
+ return nil, err
+ }
+ a, err := coder.DecodeStringUTF8(r)
+ if err != nil {
+ return nil, fmt.Errorf("decoding string field A: %w", err)
+ }
+ c, err := coder.DecodeStringUTF8(r)
+ if err != nil {
+ return nil, fmt.Errorf("decoding string field C: %v, %w", c,
err)
+ }
+ return UserType1{
+ A: a,
+ B: 42,
+ C: c,
+ }, nil
+}
+
+// TestValidateCoder_SingleValue checks that the validate coder fun will
Review comment:
Typo: Unfinished comment.
##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// RowDecoderBuilder allows one to build Beam Schema row encoders for provided
types.
+type RowDecoderBuilder struct {
+ allFuncs map[reflect.Type]decoderProvider
+ ifaceFuncs []reflect.Type
+}
+
+type decoderProvider = func(reflect.Type) (func(io.Reader) (interface{},
error), error)
+
+// Register accepts a provider to decode schema encoded values
+// of that type.
+//
+// When decoding values, decoder functions produced by this builder will
+// first check for exact type matches, then interfaces implemented by
+// the type in recency order of registration, and then finally the
+// default Beam Schema encoding behavior.
+//
+// TODO(BEAM-9615): Add final factory types. This interface is subject to
change.
+// Currently f must be a function func(reflect.Type) (func(io.Reader)
(interface{}, error), error)
+func (b *RowDecoderBuilder) Register(rt reflect.Type, f interface{}) {
+ fd, ok := f.(decoderProvider)
+ if !ok {
+ panic(fmt.Sprintf("%v isn't a supported decoder function type
(passed with %T)", f, rt))
+ }
+
+ if rt.Kind() == reflect.Interface && rt.NumMethod() == 0 {
+ panic(fmt.Sprintf("interface type %v must have methods", rt))
+ }
+
+ if b.allFuncs == nil {
+ b.allFuncs = make(map[reflect.Type]decoderProvider)
+ }
+ b.allFuncs[rt] = fd
+ if rt.Kind() == reflect.Interface {
+ b.ifaceFuncs = append(b.ifaceFuncs, rt)
+ }
+}
+
+// Build constructs a Beam Schema coder for the given type, using any
providers registered for
+// itself or it's fields.
+func (b *RowDecoderBuilder) Build(rt reflect.Type) (func(io.Reader)
(interface{}, error), error) {
+ if err := rowTypeValidation(rt, true); err != nil {
+ return nil, err
+ }
+ return b.decoderForType(rt), nil
+}
+
+// decoderForType returns a decoder function for the struct or pointer to
struct type.
+func (b *RowDecoderBuilder) decoderForType(t reflect.Type) func(io.Reader)
(interface{}, error) {
+ // Check if there are any providers registered for this type, or that
this type adheres to any interfaces.
+ if f := b.customFunc(t); f != nil {
+ return f
+ }
+
+ var isPtr bool
+ // Pointers become the value type for decomposition.
+ if t.Kind() == reflect.Ptr {
+ isPtr = true
+ t = t.Elem()
+ }
+ dec := b.decoderForStructReflect(t)
+
+ if isPtr {
+ return func(r io.Reader) (interface{}, error) {
+ rv := reflect.New(t)
+ err := dec(rv.Elem(), r)
+ return rv.Interface(), err
+ }
+ }
+ return func(r io.Reader) (interface{}, error) {
+ rv := reflect.New(t)
+ err := dec(rv.Elem(), r)
+ return rv.Elem().Interface(), err
+ }
+}
+
+// decoderForStructReflect returns a reflection based decoder function for the
+// given struct type.
+func (b *RowDecoderBuilder) decoderForStructReflect(t reflect.Type)
func(reflect.Value, io.Reader) error {
+ var coder typeDecoderReflect
+ for i := 0; i < t.NumField(); i++ {
+ i := i // avoid alias issues in the closures.
+ dec := b.decoderForSingleTypeReflect(t.Field(i).Type)
+ coder.fields = append(coder.fields, func(rv reflect.Value, r
io.Reader) error {
+ return dec(rv.Field(i), r)
+ })
+ }
+ return func(rv reflect.Value, r io.Reader) error {
+ nf, nils, err := ReadRowHeader(r)
+ if err != nil {
+ return err
+ }
+ if nf != len(coder.fields) {
+ return errors.Errorf("schema[%v] changed: got %d
fields, want %d fields", "TODO", nf, len(coder.fields))
+ }
+ for i, f := range coder.fields {
+ if IsFieldNil(nils, i) {
+ continue
+ }
+ if err := f(rv, r); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+}
+
+func reflectDecodeBool(rv reflect.Value, r io.Reader) error {
+ v, err := DecodeBool(r)
+ if err != nil {
+ return errors.Wrap(err, "error decoding bool field")
+ }
+ rv.SetBool(v)
+ return nil
+}
+
+func reflectDecodeByte(rv reflect.Value, r io.Reader) error {
+ b, err := DecodeByte(r)
+ if err != nil {
+ return errors.Wrap(err, "error decoding single byte field")
+ }
+ rv.SetUint(uint64(b))
+ return nil
+}
+
+func reflectDecodeString(rv reflect.Value, r io.Reader) error {
+ v, err := DecodeStringUTF8(r)
+ if err != nil {
+ return errors.Wrap(err, "error decoding string field")
+ }
+ rv.SetString(v)
+ return nil
+}
+
+func reflectDecodeInt(rv reflect.Value, r io.Reader) error {
+ v, err := DecodeVarInt(r)
+ if err != nil {
+ return errors.Wrap(err, "error decoding varint field")
+ }
+ rv.SetInt(v)
+ return nil
+}
+
+func reflectDecodeFloat(rv reflect.Value, r io.Reader) error {
+ v, err := DecodeDouble(r)
+ if err != nil {
+ return errors.Wrap(err, "error decoding double field")
+ }
+ rv.SetFloat(v)
+ return nil
+}
+
+// customFunc returns nil if no custom func exists for this.
+func (b *RowDecoderBuilder) customFunc(t reflect.Type) func(io.Reader)
(interface{}, error) {
+ if fact, ok := b.allFuncs[t]; ok {
+ f, err := fact(t)
+
+ // TODO handle errors?
+ if err != nil {
+ return nil
+ }
+ return f
+ }
+ // Check satisfaction of interface types in reverse registration order.
+ for i := len(b.ifaceFuncs) - 1; i >= 0; i-- {
+ it := b.ifaceFuncs[i]
+ if ok := t.AssignableTo(it); ok {
+ if fact, ok := b.allFuncs[it]; ok {
+ f, err := fact(t)
+ // TODO handle errors?
+ if err != nil {
+ return nil
+ }
+ return f
+ }
+ }
+ }
+ return nil
+}
+
+// decoderForSingleTypeReflect returns a reflection based decoder function for
the
+// given type.
+func (b *RowDecoderBuilder) decoderForSingleTypeReflect(t reflect.Type)
func(reflect.Value, io.Reader) error {
+ // Check if there are any providers registered for this type, or that
this type adheres to any interfaces.
+ if dec := b.customFunc(t); dec != nil {
+ return func(v reflect.Value, r io.Reader) error {
+ elm, err := dec(r)
+ if err != nil {
+ return err
+ }
+ v.Set(reflect.ValueOf(elm))
+ return nil
+ }
+ }
+
+ switch t.Kind() {
+ case reflect.Struct:
+ return b.decoderForStructReflect(t)
+ case reflect.Bool:
+ return reflectDecodeBool
+ case reflect.Uint8:
+ return reflectDecodeByte
+ case reflect.String:
+ return reflectDecodeString
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32,
reflect.Int64:
+ return reflectDecodeInt
+ case reflect.Float32, reflect.Float64:
+ return reflectDecodeFloat
+ case reflect.Ptr:
+ decf := b.decoderForSingleTypeReflect(t.Elem())
+ return func(rv reflect.Value, r io.Reader) error {
+ nv := reflect.New(t.Elem())
+ rv.Set(nv)
+ return decf(nv.Elem(), r)
+ }
+ case reflect.Slice:
+ // Special case handling for byte slices.
+ if t.Elem().Kind() == reflect.Uint8 {
+ return func(rv reflect.Value, r io.Reader) error {
Review comment:
Nit: This seems like a good candidate for a `reflectDecodeBytes` helper
function like the others above.
##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil.go
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package testutil contains helpers to test and validate custom Beam Schema
coders.
+package testutil
+
+import (
+ "bytes"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ "github.com/google/go-cmp/cmp"
+)
+
+// SchemaCoder helps validate custom schema coders.
+type SchemaCoder struct {
+ encBldUT, encBldSchema coder.RowEncoderBuilder
+ decBldUT, decBldSchema coder.RowDecoderBuilder
+
+ // CmpOptions to pass into the round trip comparison
+ CmpOptions cmp.Options
+}
+
+// Register adds additional custom types not under test to both the under test
+// and default schema coders.
+func (v *SchemaCoder) Register(rt reflect.Type, encF, decF interface{}) {
+ v.encBldUT.Register(rt, encF)
+ v.encBldSchema.Register(rt, encF)
+ v.decBldUT.Register(rt, decF)
+ v.decBldSchema.Register(rt, decF)
+}
+
+// Validate is a test utility to validate custom schema coders generate
+// beam schema encoded bytes.
+//
+// Validate accepts the reflect.Type to register, factory functions for
encoding and decoding, an
+// anonymous struct type equivalent to the encoded format produced and
consumed by the factory produced functions
+// and test values. Values must be a single struct or pointer to struct.
+//
+// TODO(lostluck): Improve documentation.
+// TODO(lostluck): Abstract into a configurable struct, to handle
+//
+// Validate will register the under test factories and generate an encoder and
decoder function.
+// These functions will be re-used for all test values. This emulates coders
being re-used for all
+// elements within a bundle.
+//
+// Validate mutates the SchemaCoderValidator, so the SchemaCoderValidator may
not be used more than once.
+func (v *SchemaCoder) Validate(t *testing.T, rt reflect.Type, encF, decF,
schema interface{}, values interface{}) {
+ t.Helper()
+ testValues := reflect.ValueOf(values)
+ // Check whether we have a slice type or not.
+ if testValues.Type().Kind() != reflect.Slice {
+ vs := reflect.MakeSlice(reflect.SliceOf(testValues.Type()), 0,
1)
+ testValues = reflect.Append(vs, testValues)
+ }
+ if testValues.Len() == 0 {
+ t.Fatalf("No test values provided for ValidateSchemaCoder(%v)",
rt)
+ }
+ // We now have non empty slice of test values!
+
+ v.encBldUT.Register(rt, encF)
+ v.decBldUT.Register(rt, decF)
+
+ testRt := testValues.Type().Elem()
+ encUT, err := v.encBldUT.Build(testRt)
+ if err != nil {
+ t.Fatalf("Unable to build encoder function with given factory:
coder.RowEncoderBuilder.Build(%v) = %v, want nil error", rt, err)
+ }
+ decUT, err := v.decBldUT.Build(testRt)
+ if err != nil {
+ t.Fatalf("Unable to build decoder function with given factory:
coder.RowDecoderBuilder.Build(%v) = %v, want nil error", rt, err)
+ }
+
+ schemaRt := reflect.TypeOf(schema)
+ encSchema, err := v.encBldSchema.Build(schemaRt)
+ if err != nil {
+ t.Fatalf("Unable to build encoder function for schema
equivalent type: coder.RowEncoderBuilder.Build(%v) = %v, want nil error", rt,
err)
+ }
+ decSchema, err := v.decBldSchema.Build(schemaRt)
+ if err != nil {
+ t.Fatalf("Unable to build decoder function for schema
equivalent type: coder.RowDecoderBuilder.Build(%v) = %v, want nil error", rt,
err)
+ }
+ for i := 0; i < testValues.Len(); i++ {
+ t.Run(fmt.Sprintf("%v[%d]", rt, i), func(t *testing.T) {
+ var buf bytes.Buffer
+ want := testValues.Index(i).Interface()
+ if err := encUT(want, &buf); err != nil {
+ t.Fatalf("error calling Under Test
encoder[%v](%v) = %v", testRt, want, err)
+ }
+ initialBytes := clone(buf.Bytes())
+
+ bufSchema := bytes.NewBuffer(clone(initialBytes))
+
+ schemaV, err := decSchema(bufSchema)
+ if err != nil {
+ t.Fatalf("error calling Equivalent Schema
decoder[%v]() = %v", schemaRt, err)
+ }
+ err = encSchema(schemaV, bufSchema)
Review comment:
Does reading from the buffer (a few lines above with `decSchema`) remove
those bytes from the buffer (or otherwise prevent those bytes from being
cloned)? Because on first read it seems like `roundTripBytes` below will
contain the element's bytes twice, the first copied from `initialBytes`, and
the second from `encSchema` here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 527944)
Time Spent: 18h 50m (was: 18h 40m)
> [Go SDK] Beam Schemas
> ---------------------
>
> Key: BEAM-9615
> URL: https://issues.apache.org/jira/browse/BEAM-9615
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: P2
> Time Spent: 18h 50m
> Remaining Estimate: 0h
>
> Schema support is required for advanced cross language features in Beam, and
> has the opportunity to replace the current default JSON encoding of elements.
> Some quick notes, though a better fleshed out doc with details will be
> forthcoming:
> * All base coders should be implemented, and listed as coder capabilities. I
> think only stringutf8 is missing presently.
> * Should support fairly arbitrary user types, seamlessly. That is, users
> should be able to rely on it "just working" if their type is compatible.
> * Should support schema metadata tagging.
> In particular, one breaking shift in the default will be to explicitly fail
> pipelines if elements have unexported fields, when no other custom coder has
> been added. This has been a source of errors/dropped data/keys and a simply
> warning at construction time won't cut it. However, we could provide a manual
> "use beam schemas, but ignore unexported fields" registration as a work
> around.
> Edit: Doc is now at https://s.apache.org/beam-go-schemas
--
This message was sent by Atlassian Jira
(v8.3.4#803005)