[
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=768016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768016
]
ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 09/May/22 16:58
Start Date: 09/May/22 16:58
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17579:
URL: https://github.com/apache/beam/pull/17579#discussion_r868223691
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
Teardown(ctx context.Context) error
}
+type createAccumulator0x1[T any] interface {
+ CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+ CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+ AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+ AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+ MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+ MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+ ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+ ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's
structural functions
+// and types and optimizes their runtime execution. There are 3 different
Accumulator
+// functions, each of which should be used for a different situation.
+{{if (eq $genericParams 1)}}// Accumulator1 should be used when your
accumulator, input, and output are all of the same type.
+// It can be called with register.Accumulator1[T](&CustomAccumulator{})
+// where T is the type of the input/accumulator/output.
+{{else}}{{if (eq $genericParams 2)}}// Accumulator2 should be used when your
accumulator, input, and output are 2 distinct types.
+// It can be called with register.Accumulator2[T1, T2](&CustomAccumulator{})
+// where T1 is the type of the accumulator and T2 is the other type.
+{{else}}// Accumulator3 should be used when your accumulator, input, and
output are 3 distinct types.
+// It can be called with register.Accumulator3[T1, T2,
T3](&CustomAccumulator{})
+// where T1 is the type of the accumulator, T2 is the type of the input, and
T3 is the type of the output.
+{{end}}{{end}}func Accumulator{{$genericParams}}[{{range $paramNum := upto
$genericParams}}{{if $paramNum}}, {{end}}T{{$paramNum}}{{end}} any](accum
interface{}) {
+ registerAccumulatorTypes(accum)
+ accumVal := reflect.ValueOf(accum)
+ var mergeAccumulatorsWrapper func(fn interface{}) reflectx.Func
+ if _, ok := accum.(mergeAccumulators2x2[T0]); ok {
+ caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(T0, T0) (T0, error))
+ return &caller2x2[T0, T0, T0, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0,
error))(nil)).Elem(), caller)
+
+ mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error)
{
+ return
fn.(mergeAccumulators2x2[T0]).MergeAccumulators(a0, a1)
+ })
+ }
+ } else if _, ok := accum.(mergeAccumulators2x1[T0]); ok {
+ caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(T0, T0) T0)
+ return &caller2x1[T0, T0, T0]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0)
T0)(nil)).Elem(), caller)
+
+ mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 {
+ return
fn.(mergeAccumulators2x1[T0]).MergeAccumulators(a0, a1)
+ })
+ }
+ }
+
+ if mergeAccumulatorsWrapper == nil{
Review Comment:
Done!
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
Teardown(ctx context.Context) error
}
+type createAccumulator0x1[T any] interface {
+ CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+ CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+ AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+ AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+ MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+ MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+ ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+ ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's
structural functions
Review Comment:
Yeah, that makes sense - updated!
Issue Time Tracking
-------------------
Worklog Id: (was: 768016)
Time Spent: 10.5h (was: 10h 20m)
> [Go SDK] Allow users to optimize DoFns with a single generic registration
> function
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14347
> URL: https://issues.apache.org/jira/browse/BEAM-14347
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 10.5h
> Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator.
> This updates to allow them to use generics instead.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)