lostluck commented on a change in pull request #16775:
URL: https://github.com/apache/beam/pull/16775#discussion_r802187655



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makeWindowedCoder() *coder.Coder {
+       vCoder := coder.NewDouble()
+       return coder.NewW(vCoder, coder.NewGlobalWindow())
+}
+
+func makeWindowedKVCoder() *coder.Coder {
+       kCoder := coder.NewString()
+       vCoder := coder.NewDouble()
+       kvCoder := coder.NewKV([]*coder.Coder{kCoder, vCoder})
+       return coder.NewW(kvCoder, coder.NewGlobalWindow())
+}
+
+func TestNewSideInputAdapter(t *testing.T) {
+       tests := []struct {
+               name        string
+               sid         StreamID
+               sideInputID string
+               c           *coder.Coder
+               kc          ElementEncoder
+               ec          ElementDecoder
+       }{
+               {
+                       name:        "KV coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedKVCoder(),
+                       kc:          &stringEncoder{},
+                       ec:          &doubleDecoder{},
+               },
+               {
+                       name:        "V coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedCoder(),
+                       kc:          nil,
+                       ec:          &doubleDecoder{},
+               },
+       }
+       for _, test := range tests {
+               adapter := NewSideInputAdapter(test.sid, test.sideInputID, 
test.c, nil)
+               adapterStruct, ok := adapter.(*sideInputAdapter)
+               if !ok {
+                       t.Errorf("failed to convert interface to 
sideInputAdapter struct in test %v", test)
+               }
+               if got, want := adapterStruct.sid, test.sid; got != want {
+                       t.Errorf("got SID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.sideInputID, test.sideInputID; 
got != want {
+                       t.Errorf("got sideInputID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.c, test.c; got != want {
+                       t.Errorf("got coder %v, want %v", got, want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.kc), 
reflect.TypeOf(test.kc); got != want {
+                       t.Errorf("got ElementEncoder type %v, want %v", got, 
want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.ec), 
reflect.TypeOf(test.ec); got != want {
+                       t.Errorf("got ElementDecoder type %v, want %v", got, 
want)
+               }

Review comment:
       Consider wrapping this in t.Run so that failures will be more 
identifiable to the test case. As it stands, if something failed in two cases, 
it might not be easy to idenfity where they came from.  Further, see 
https://github.com/golang/go/wiki/TestComments#got-before-want and 
https://github.com/golang/go/wiki/TestComments#table-driven-tests-vs-multiple-test-functions
   
   ```suggestion
                t.Run(test.name, func(t *testing.T) {
                                adapter := NewSideInputAdapter(test.sid, 
test.sideInputID, test.c, nil)
                                adapterStruct, ok := adapter.(*sideInputAdapter)
                                if !ok {
                                        t.Errorf("failed to convert interface 
to sideInputAdapter struct in test %v", test)
                                }
                                if got, want := adapterStruct.sid, test.sid; 
got != want {
                                        t.Errorf("got SID %v, want %v", got, 
want)
                                }
                                if got, want := adapterStruct.sideInputID, 
test.sideInputID; got != want {
                                        t.Errorf("got sideInputID %v, want %v", 
got, want)
                                }
                                if got, want := adapterStruct.c, test.c; got != 
want {
                                        t.Errorf("got coder %v, want %v", got, 
want)
                                }
                                if got, want := 
reflect.TypeOf(adapterStruct.kc), reflect.TypeOf(test.kc); got != want {
                                        t.Errorf("got ElementEncoder type %v, 
want %v", got, want)
                                }
                                if got, want := 
reflect.TypeOf(adapterStruct.ec), reflect.TypeOf(test.ec); got != want {
                                        t.Errorf("got ElementDecoder type %v, 
want %v", got, want)
                                }
                })
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -418,7 +379,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
([]string, error) {
 
                                si[fmt.Sprintf("i%v", i)] = &pipepb.SideInput{
                                        AccessPattern: &pipepb.FunctionSpec{
-                                               Urn: URNMultimapSideInput,
+                                               Urn: URNIterableSideInput,
                                        },
                                        ViewFn: &pipepb.FunctionSpec{
                                                Urn: "foo",

Review comment:
       In this case, it's "required" by some runner implementations to be 
non-empty, if not actually required by explicit proto semantics.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makeWindowedCoder() *coder.Coder {
+       vCoder := coder.NewDouble()
+       return coder.NewW(vCoder, coder.NewGlobalWindow())
+}
+
+func makeWindowedKVCoder() *coder.Coder {
+       kCoder := coder.NewString()
+       vCoder := coder.NewDouble()
+       kvCoder := coder.NewKV([]*coder.Coder{kCoder, vCoder})
+       return coder.NewW(kvCoder, coder.NewGlobalWindow())
+}
+
+func TestNewSideInputAdapter(t *testing.T) {
+       tests := []struct {
+               name        string
+               sid         StreamID
+               sideInputID string
+               c           *coder.Coder
+               kc          ElementEncoder
+               ec          ElementDecoder
+       }{
+               {
+                       name:        "KV coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedKVCoder(),
+                       kc:          &stringEncoder{},
+                       ec:          &doubleDecoder{},
+               },
+               {
+                       name:        "V coder",
+                       sid:         StreamID{Port: Port{URL: 
"localhost:8099"}, PtransformID: "n0"},
+                       sideInputID: "i0",
+                       c:           makeWindowedCoder(),
+                       kc:          nil,
+                       ec:          &doubleDecoder{},
+               },
+       }
+       for _, test := range tests {
+               adapter := NewSideInputAdapter(test.sid, test.sideInputID, 
test.c, nil)
+               adapterStruct, ok := adapter.(*sideInputAdapter)
+               if !ok {
+                       t.Errorf("failed to convert interface to 
sideInputAdapter struct in test %v", test)
+               }
+               if got, want := adapterStruct.sid, test.sid; got != want {
+                       t.Errorf("got SID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.sideInputID, test.sideInputID; 
got != want {
+                       t.Errorf("got sideInputID %v, want %v", got, want)
+               }
+               if got, want := adapterStruct.c, test.c; got != want {
+                       t.Errorf("got coder %v, want %v", got, want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.kc), 
reflect.TypeOf(test.kc); got != want {
+                       t.Errorf("got ElementEncoder type %v, want %v", got, 
want)
+               }
+               if got, want := reflect.TypeOf(adapterStruct.ec), 
reflect.TypeOf(test.ec); got != want {
+                       t.Errorf("got ElementDecoder type %v, want %v", got, 
want)
+               }
+       }
+}
+
+func TestNewKeyedIterable_Unkeyed(t *testing.T) {
+       adapter := NewSideInputAdapter(StreamID{}, "", makeWindowedCoder(), nil)
+       rs, err := adapter.NewKeyedIterable(context.Background(), nil, nil, nil)
+       if err == nil {
+               t.Error("NewKeyedIterable() succeeded when it should have 
failed")
+       }
+       if rs != nil {
+               t.Errorf("NewKeyedIterable() returned a ReStream when it should 
not have, got %v", rs)

Review comment:
       I don't love a plain constructor only test. Ideally we would be faking 
out the cache or state reader behavior, but that feels like a bit much. We'd 
probably want to have some additional test infrastructure to get easier to use 
Test ReStreams and a Test io.ReadCloser to validate the other half of things. 
Probably better to punt to another PR. (we have some existing utilities in a 
another test file in the exec package.)

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -43,26 +43,71 @@ type sideInputAdapter struct {
        kc          ElementEncoder
        ec          ElementDecoder
        wm          WindowMapper
+       c           *coder.Coder
 }
 
 // NewSideInputAdapter returns a side input adapter for the given StreamID and 
coder.
-// It expects a W<KV<K,V>> coder, because the protocol supports MultiSet 
access only.
+// It expects a W<V> or W<KV<K,V>> coder, because the protocol requires 
windowing information.
 func NewSideInputAdapter(sid StreamID, sideInputID string, c *coder.Coder, wm 
WindowMapper) SideInputAdapter {
-       if !coder.IsW(c) || !coder.IsKV(coder.SkipW(c)) {
-               panic(fmt.Sprintf("expected WKV coder for side input %v: %v", 
sid, c))
+       if !coder.IsW(c) {
+               panic(fmt.Sprintf("expected WV coder for side input %v: %v", 
sid, c))
        }
 
        wc := MakeWindowEncoder(c.Window)
-       kc := MakeElementEncoder(coder.SkipW(c).Components[0])
-       ec := MakeElementDecoder(coder.SkipW(c).Components[1])
-       return &sideInputAdapter{sid: sid, sideInputID: sideInputID, wc: wc, 
kc: kc, ec: ec, wm: wm}
+       var kc ElementEncoder
+       var ec ElementDecoder
+       if coder.IsKV(coder.SkipW(c)) {
+               kc = MakeElementEncoder(coder.SkipW(c).Components[0])
+               ec = MakeElementDecoder(coder.SkipW(c).Components[1])
+       } else {
+               ec = MakeElementDecoder(coder.SkipW(c))

Review comment:
       Good explanation!
   In particular, for MultiMap side inputs, a user requests the values for a 
given Key, which we then encode with the window, and we return out the 
unwindowed values out as the user iterates.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to