camphillips22 opened a new issue, #24774:
URL: https://github.com/apache/beam/issues/24774

   ### What happened?
   
   Add this test to the pipeline tester:
   ```golang
   func TestRunner_Pipelines(t *testing.T) {
           // ...
        t.Run("sideinput_multimap", func(t *testing.T) {
                p, s := beam.NewPipelineWithRoot()
                imp := beam.Impulse(s)
                col1 := beam.ParDo(s, dofnKV, imp)
                keys := filter.Distinct(s, beam.DropValue(s, col1))
                ks, sum := beam.ParDo2(s, dofnMultiMap, keys, 
beam.SideInput{Input: col1})
                beam.ParDo(s, &stringCheck{
                        Name: "iterKV sideinput check K",
                        Want: []string{"a", "b"},
                }, ks)
                beam.ParDo(s, &int64Check{
                        Name: "iterKV sideinput check V",
                        Want: []int{9, 12},
                }, sum)
                if _, err := executeWithT(context.Background(), t, p); err != 
nil {
                        t.Fatal(err)
                }
        })
           //...
   }
   
   func dofnMultiMap(key string, lookup func(string) func(*int64) bool, emitK 
func(string), emitV func(int64)) {
        var v, sum int64
        iter := lookup(key)
        for iter(&v) {
                sum += v
        }
        emitK(key)
        emitV(sum)
   }
   ```
   
   And you get this error:
   
   ```
   2022/12/23 14:26:27 wait[6] unblocked w/ 2 [true]
       direct_test.go:476: panic: reflect.Set: value of type string is not 
assignable to type int64
           Full error:
           while executing FinishBundle for Plan[plan]:
           16: Impulse[0]
           1: Discard
           2: ParDo[direct.stringCheck] Out:[1] Sig: func(string, func(string))
           3: Discard
           4: ParDo[direct.int64Check] Out:[3] Sig: func(int64, func(int64))
           5: ParDo[direct.dofnMultiMap] Out:[2 4] Sig: func(string, 
func(string) func(*int64) bool, func(string), func(int64))
           6: wait[1] Out:5
           7: buffer[7]. wait:6 Out:5
           8: ParDo[beam.dropValueFn] Out:[6] Sig: func(typex.X, typex.Y) 
typex.X
           9: Combine[filter.mergeFn] Keyed:false Out:8
           10: CoGBK. Out:9
           11: Inject[0]. Out:10
           12: ParDo[filter.mapFn] Out:[11] Sig: func(typex.T) (typex.T, int)
           13: ParDo[beam.dropValueFn] Out:[12] Sig: func(typex.X, typex.Y) 
typex.X
           14: Multiplex. Out:[13 7]
           15: ParDo[direct.dofnKV] Out:[14] Sig: func([]uint8, func(string, 
int64))
                caused by:
           panic: reflect.Set: value of type string is not assignable to type 
int64 goroutine 21 [running]:
   ```
   
   The runner is trying to put the integer (the value in the multi map) into a 
string.
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to