damccorm commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r860058545


##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
+{{$funcName := "unknown"}}{{$structName := "unknown"}}{{if (eq .func 
"startBundle")}}{{$funcName = "startBundle"}}{{$structName = 
"StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$funcName = 
"finishBundle"}}{{$structName = "FinishBundle"}}{{end}}
+       {{$funcName}}In := -1
+       {{$funcName}}Out := -1
+    var {{$funcName}}Wrapper func(fn interface{}) reflectx.Func
+       {{$funcName}}Method := 
reflect.ValueOf(doFn).MethodByName("{{$structName}}")
+       if {{$funcName}}Method.IsValid() {
+               {{$funcName}}In = {{$funcName}}Method.Type().NumIn()
+               {{$funcName}}Out = {{$funcName}}Method.Type().NumOut()
+           switch {
+{{range $funcIn := upto 8}}
+    case {{$funcName}}In == {{$funcIn}}:
+            switch { {{range $funcOut := upto 2}}{{$possibleCombos := 
(possibleBundleLifecycleParameterCombos $funcIn $.processElementIn)}}{{if 
$possibleCombos}}
+            case {{$funcName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first 
= false}}                {{else}} else {{end}}if _, ok := 
doFn.({{$funcName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join 
$funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); 
ok {
+                    {{$funcName}}Caller := func(fn interface{}) reflectx.Func {
+                        f := fn.(func({{(join $funcCombo ", ")}}){{if 
$funcOut}} error{{end}})
+                        return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn 
$funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, 
{{end}}error{{end}}]{{end}}{fn: f}
+                    }
+                    reflectx.RegisterFunc(reflect.TypeOf((*func({{(join 
$funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), 
{{$funcName}}Caller)
+                    {{$funcRegister := (makeStructRegisterEntry $funcName 
$structName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = 
(makeStructRegisterEntry $funcName $structName $funcCombo (list 
"error"))}}{{end}}
+                    {{$funcName}}Wrapper = func(fn interface{}) reflectx.Func {
+                        return {{$funcRegister}}
+                    }
+                } {{end}}{{end}}{{end}}
+            default:
+                panic("Invalid signature for {{$structName}}")
+            }
+{{end}}
+        default:
+            panic("Invalid signature for {{$structName}}")
+        }
+    }
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam
+
+import (
+       "context"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+{{$processElementMaxOut := 5}}{{$processElementMaxIn := 
9}}{{$startFinishBundleOutRange := 2}}{{$startFinishBundleInRange := 8}}{{range 
$processElementOut := upto $processElementMaxOut}}{{range $processElementIn := 
upto $processElementMaxIn}}
+type 
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation 
$processElementIn $processElementOut true)}} interface {
+    ProcessElement({{range $in := upto $processElementIn}}{{if $in}}, 
{{end}}i{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+type 
caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut true)}} struct {
+    fn func({{range $in := upto $processElementIn}}{{if $in}}, 
{{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) Name() string {
+       return reflectx.FunctionName(c.fn)
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) Type() reflect.Type {
+       return reflect.TypeOf(c.fn)
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) Call(args []interface{}) 
[]interface{} {
+    {{if $processElementOut}}{{range $out := upto $processElementOut}}{{if 
$out}}, {{end}}out{{$out}}{{end}} := {{end}}c.fn({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}args[{{$in}}].(I{{$in}}){{end}})
+       return []interface{}{ {{if $processElementOut}}{{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}out{{$out}}{{end}}{{end}} }
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) 
Call{{$processElementIn}}x{{$processElementOut}}({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}arg{{$in}} I{{$in}}{{end}}){{if 
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, 
{{end}}interface{}{{end}}){{end}} {
+    {{if $processElementOut}}return {{end}}c.fn({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}arg{{$in}}{{end}})
+}
+
+func 
registerDoFn{{$processElementIn}}x{{$processElementOut}}StructWrappersAndFuncs{{(genericTypingRepresentation
 $processElementIn $processElementOut true)}}(doFn 
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation 
$processElementIn $processElementOut false)}}) {
+    processElementCaller := func(fn interface{}) reflectx.Func {
+               f := fn.(func({{range $in := upto $processElementIn}}{{if 
$in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}})
+               return 
&caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}{fn: f}
+       }
+       reflectx.RegisterFunc(reflect.TypeOf((*func({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} 
({{range $out := upto $processElementOut}}{{if $out}}, 
{{end}}R{{$out}}{{end}}){{end}})(nil)).Elem(), processElementCaller)
+    processElementWrapper := func(fn interface{}) reflectx.Func {
+        return reflectx.MakeFunc(func({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}a{{$in}} I{{$in}}{{end}}){{if 
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, 
{{end}}R{{$out}}{{end}}){{end}} {
+            {{if $processElementOut}}return 
{{end}}fn.(doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}).ProcessElement({{range $in := 
upto $processElementIn}}{{if $in}}, {{end}}a{{$in}}{{end}})
+        })
+    }{{template "StructWrappersAndFuncs_StartFinishBundle" (dict 
"processElementIn" $processElementIn "processElementOut" $processElementOut 
"func" "startBundle")}}{{template "StructWrappersAndFuncs_StartFinishBundle" 
(dict "processElementIn" $processElementIn "processElementOut" 
$processElementOut "func" "finishBundle")}}
+    var setupWrapper func(fn interface{}) reflectx.Func
+    if _, ok := doFn.(setup0x0); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), 
setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.(setup0x0).Setup()
+            })
+        }
+    } else if _, ok := doFn.(setup1x0); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), 
setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.(setup1x0).Setup(a0)
+            })
+        }
+    } else if _, ok := doFn.(setup0x1); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), 
setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.(setup0x1).Setup()
+            })
+        }
+    } else if _, ok := doFn.(setup1x1); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) 
error)(nil)).Elem(), setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.(setup1x1).Setup(a0)
+            })
+        }
+    }
+    var teardownWrapper func(fn interface{}) reflectx.Func
+    if _, ok := doFn.(teardown0x0); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), 
teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.(teardown0x0).Teardown()
+            })
+        }
+    } else if _, ok := doFn.(teardown1x0); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), 
teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.(teardown1x0).Teardown(a0)
+            })
+        }
+    } else if _, ok := doFn.(teardown0x1); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), 
teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.(teardown0x1).Teardown()
+            })
+        }
+    } else if _, ok := doFn.(teardown1x1); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) 
error)(nil)).Elem(), teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.(teardown1x1).Teardown(a0)
+            })
+        }
+    }
+    wrapperFn := func(fn interface{}) map[string]reflectx.Func {
+        m := map[string]reflectx.Func{}
+        if processElementWrapper != nil {
+            m["ProcessElement"] = processElementWrapper(fn)
+        }
+        if startBundleWrapper != nil {
+            m["StartBundle"] = startBundleWrapper(fn)
+        }
+        if finishBundleWrapper != nil {
+            m["FinishBundle"] = finishBundleWrapper(fn)
+        }
+        if setupWrapper != nil {
+            m["Setup"] = setupWrapper(fn)
+        }
+        if teardownWrapper != nil {
+            m["Teardown"] = teardownWrapper(fn)
+        }
+        
+        return m
+    }

Review Comment:
   This was not actually one the big one that I was considering, though I did 
change it as well
   
   > Do recall that if a helper doesn't need arity assistance, it may be able 
to be put into a non-generated file the same package, and tested normally.
   
   That brings up an interesting style question - I had considered doing this 
initially, but decided to keep it in the template file because logically it 
belongs with the other functions in registration.go. Stylistically, is it more 
typical to move functions like this out of the template file or to keep them 
with the functions they logically go with? Doesn't really matter for this case 
at this point, but I'm curious about the principle



##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,133 @@ func upto(i int) []int {
        }
        return ret
 }
+
+func add(i int, j int) int {
+       return i + j
+}
+
+func mult(i int, j int) int {
+       return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+       dict := make(map[string]interface{}, len(values)/2)
+       if len(values)%2 != 0 {
+               panic("Invalid dictionary call")
+       }
+       for i := 0; i < len(values); i += 2 {
+               dict[values[i].(string)] = values[i+1]
+       }
+
+       return dict
+}
+
+func list(values ...string) []string {
+       return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+       seenElements := false
+       typing := ""
+       if in > 0 {
+               typing += fmt.Sprintf("[I%v", 0)
+               for i := 1; i < in; i++ {
+                       typing += fmt.Sprintf(", I%v", i)
+               }
+               seenElements = true
+       }
+       if out > 0 {
+               i := 0
+               if !seenElements {
+                       typing += fmt.Sprintf("[R%v", 0)
+                       i++
+               }
+               for i < out {
+                       typing += fmt.Sprintf(", R%v", i)
+                       i++
+               }
+               seenElements = true
+       }
+
+       if seenElements {
+               if includeType {
+                       typing += " any"
+               }
+               typing += "]"
+       }
+
+       return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, 
processElementInInterface interface{}) [][]string {
+       numIn := numInInterface.(int)
+       processElementIn := processElementInInterface.(int)
+       ordered_known_parameter_options := []string{"context.Context", 
"typex.PaneInfo", "[]typex.Window", "typex.EventTime", 
"typex.BundleFinalization"}
+       // Because of how Bundle lifecycle functions are invoked, all known 
parameters must preced unknown options and be in order.
+       // Once we hit an unknown options, all remaining unknown options must 
be included since all iters/emitters must be included
+       // Therefore, we can generate a powerset of the known options and fill 
out any remaining parameters with an ordered set of remaining unknown options
+       pSetSize := int(math.Pow(2, 
float64(len(ordered_known_parameter_options))))
+       combos := make([][]string, 0, pSetSize)
+
+       var index int
+       for index < pSetSize {

Review Comment:
   There's not - this was a consequence of an evolving approach and I missed 
this bit of cleanup - thanks



##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}

Review Comment:
   That's a fair concern - I primarily chose the main package because it is 
where our existing registration functions live, I don't think that is worth the 
cost to the docs though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to