damccorm commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r860058545
##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
+{{$funcName := "unknown"}}{{$structName := "unknown"}}{{if (eq .func
"startBundle")}}{{$funcName = "startBundle"}}{{$structName =
"StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$funcName =
"finishBundle"}}{{$structName = "FinishBundle"}}{{end}}
+ {{$funcName}}In := -1
+ {{$funcName}}Out := -1
+ var {{$funcName}}Wrapper func(fn interface{}) reflectx.Func
+ {{$funcName}}Method :=
reflect.ValueOf(doFn).MethodByName("{{$structName}}")
+ if {{$funcName}}Method.IsValid() {
+ {{$funcName}}In = {{$funcName}}Method.Type().NumIn()
+ {{$funcName}}Out = {{$funcName}}Method.Type().NumOut()
+ switch {
+{{range $funcIn := upto 8}}
+ case {{$funcName}}In == {{$funcIn}}:
+ switch { {{range $funcOut := upto 2}}{{$possibleCombos :=
(possibleBundleLifecycleParameterCombos $funcIn $.processElementIn)}}{{if
$possibleCombos}}
+ case {{$funcName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first
= false}} {{else}} else {{end}}if _, ok :=
doFn.({{$funcName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join
$funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}});
ok {
+ {{$funcName}}Caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func({{(join $funcCombo ", ")}}){{if
$funcOut}} error{{end}})
+ return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn
$funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}},
{{end}}error{{end}}]{{end}}{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func({{(join
$funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(),
{{$funcName}}Caller)
+ {{$funcRegister := (makeStructRegisterEntry $funcName
$structName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister =
(makeStructRegisterEntry $funcName $structName $funcCombo (list
"error"))}}{{end}}
+ {{$funcName}}Wrapper = func(fn interface{}) reflectx.Func {
+ return {{$funcRegister}}
+ }
+ } {{end}}{{end}}{{end}}
+ default:
+ panic("Invalid signature for {{$structName}}")
+ }
+{{end}}
+ default:
+ panic("Invalid signature for {{$structName}}")
+ }
+ }
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam
+
+import (
+ "context"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+{{$processElementMaxOut := 5}}{{$processElementMaxIn :=
9}}{{$startFinishBundleOutRange := 2}}{{$startFinishBundleInRange := 8}}{{range
$processElementOut := upto $processElementMaxOut}}{{range $processElementIn :=
upto $processElementMaxIn}}
+type
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut true)}} interface {
+ ProcessElement({{range $in := upto $processElementIn}}{{if $in}},
{{end}}i{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+type
caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut true)}} struct {
+ fn func({{range $in := upto $processElementIn}}{{if $in}},
{{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) Call(args []interface{})
[]interface{} {
+ {{if $processElementOut}}{{range $out := upto $processElementOut}}{{if
$out}}, {{end}}out{{$out}}{{end}} := {{end}}c.fn({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}args[{{$in}}].(I{{$in}}){{end}})
+ return []interface{}{ {{if $processElementOut}}{{range $out := upto
$processElementOut}}{{if $out}}, {{end}}out{{$out}}{{end}}{{end}} }
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}})
Call{{$processElementIn}}x{{$processElementOut}}({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}arg{{$in}} I{{$in}}{{end}}){{if
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}},
{{end}}interface{}{{end}}){{end}} {
+ {{if $processElementOut}}return {{end}}c.fn({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}arg{{$in}}{{end}})
+}
+
+func
registerDoFn{{$processElementIn}}x{{$processElementOut}}StructWrappersAndFuncs{{(genericTypingRepresentation
$processElementIn $processElementOut true)}}(doFn
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) {
+ processElementCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func({{range $in := upto $processElementIn}}{{if
$in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}})
+ return
&caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}}
({{range $out := upto $processElementOut}}{{if $out}},
{{end}}R{{$out}}{{end}}){{end}})(nil)).Elem(), processElementCaller)
+ processElementWrapper := func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}a{{$in}} I{{$in}}{{end}}){{if
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}},
{{end}}R{{$out}}{{end}}){{end}} {
+ {{if $processElementOut}}return
{{end}}fn.(doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}).ProcessElement({{range $in :=
upto $processElementIn}}{{if $in}}, {{end}}a{{$in}}{{end}})
+ })
+ }{{template "StructWrappersAndFuncs_StartFinishBundle" (dict
"processElementIn" $processElementIn "processElementOut" $processElementOut
"func" "startBundle")}}{{template "StructWrappersAndFuncs_StartFinishBundle"
(dict "processElementIn" $processElementIn "processElementOut"
$processElementOut "func" "finishBundle")}}
+ var setupWrapper func(fn interface{}) reflectx.Func
+ if _, ok := doFn.(setup0x0); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func())
+ return &caller0x0{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(),
setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() {
+ fn.(setup0x0).Setup()
+ })
+ }
+ } else if _, ok := doFn.(setup1x0); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context))
+ return &caller1x0[context.Context]{fn: f}
+ }
+
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(),
setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) {
+ fn.(setup1x0).Setup(a0)
+ })
+ }
+ } else if _, ok := doFn.(setup0x1); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func() error)
+ return &caller0x1[error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(),
setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() error {
+ return fn.(setup0x1).Setup()
+ })
+ }
+ } else if _, ok := doFn.(setup1x1); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context) error)
+ return &caller1x1[context.Context, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context)
error)(nil)).Elem(), setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) error {
+ return fn.(setup1x1).Setup(a0)
+ })
+ }
+ }
+ var teardownWrapper func(fn interface{}) reflectx.Func
+ if _, ok := doFn.(teardown0x0); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func())
+ return &caller0x0{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(),
teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() {
+ fn.(teardown0x0).Teardown()
+ })
+ }
+ } else if _, ok := doFn.(teardown1x0); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context))
+ return &caller1x0[context.Context]{fn: f}
+ }
+
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(),
teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) {
+ fn.(teardown1x0).Teardown(a0)
+ })
+ }
+ } else if _, ok := doFn.(teardown0x1); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func() error)
+ return &caller0x1[error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(),
teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() error {
+ return fn.(teardown0x1).Teardown()
+ })
+ }
+ } else if _, ok := doFn.(teardown1x1); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context) error)
+ return &caller1x1[context.Context, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context)
error)(nil)).Elem(), teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) error {
+ return fn.(teardown1x1).Teardown(a0)
+ })
+ }
+ }
+ wrapperFn := func(fn interface{}) map[string]reflectx.Func {
+ m := map[string]reflectx.Func{}
+ if processElementWrapper != nil {
+ m["ProcessElement"] = processElementWrapper(fn)
+ }
+ if startBundleWrapper != nil {
+ m["StartBundle"] = startBundleWrapper(fn)
+ }
+ if finishBundleWrapper != nil {
+ m["FinishBundle"] = finishBundleWrapper(fn)
+ }
+ if setupWrapper != nil {
+ m["Setup"] = setupWrapper(fn)
+ }
+ if teardownWrapper != nil {
+ m["Teardown"] = teardownWrapper(fn)
+ }
+
+ return m
+ }
Review Comment:
This was not actually one the big one that I was considering, though I did
change it as well
> Do recall that if a helper doesn't need arity assistance, it may be able
to be put into a non-generated file the same package, and tested normally.
That brings up an interesting style question - I had considered doing this
initially, but decided to keep it in the template file because logically it
belongs with the other functions in registration.go. Stylistically, is it more
typical to move functions like this out of the template file or to keep them
with the functions they logically go with? Doesn't really matter for this case
at this point, but I'm curious about the principle
##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,133 @@ func upto(i int) []int {
}
return ret
}
+
+func add(i int, j int) int {
+ return i + j
+}
+
+func mult(i int, j int) int {
+ return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+ dict := make(map[string]interface{}, len(values)/2)
+ if len(values)%2 != 0 {
+ panic("Invalid dictionary call")
+ }
+ for i := 0; i < len(values); i += 2 {
+ dict[values[i].(string)] = values[i+1]
+ }
+
+ return dict
+}
+
+func list(values ...string) []string {
+ return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+ seenElements := false
+ typing := ""
+ if in > 0 {
+ typing += fmt.Sprintf("[I%v", 0)
+ for i := 1; i < in; i++ {
+ typing += fmt.Sprintf(", I%v", i)
+ }
+ seenElements = true
+ }
+ if out > 0 {
+ i := 0
+ if !seenElements {
+ typing += fmt.Sprintf("[R%v", 0)
+ i++
+ }
+ for i < out {
+ typing += fmt.Sprintf(", R%v", i)
+ i++
+ }
+ seenElements = true
+ }
+
+ if seenElements {
+ if includeType {
+ typing += " any"
+ }
+ typing += "]"
+ }
+
+ return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{},
processElementInInterface interface{}) [][]string {
+ numIn := numInInterface.(int)
+ processElementIn := processElementInInterface.(int)
+ ordered_known_parameter_options := []string{"context.Context",
"typex.PaneInfo", "[]typex.Window", "typex.EventTime",
"typex.BundleFinalization"}
+ // Because of how Bundle lifecycle functions are invoked, all known
parameters must preced unknown options and be in order.
+ // Once we hit an unknown options, all remaining unknown options must
be included since all iters/emitters must be included
+ // Therefore, we can generate a powerset of the known options and fill
out any remaining parameters with an ordered set of remaining unknown options
+ pSetSize := int(math.Pow(2,
float64(len(ordered_known_parameter_options))))
+ combos := make([][]string, 0, pSetSize)
+
+ var index int
+ for index < pSetSize {
Review Comment:
There's not - this was a consequence of an evolving approach and I missed
this bit of cleanup - thanks
##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
Review Comment:
That's a fair concern - I primarily chose the main package because it is
where our existing registration functions live, I don't think that is worth the
cost to the docs though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]