[
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=767426&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767426
]
ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/22 21:45
Start Date: 06/May/22 21:45
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17574:
URL: https://github.com/apache/beam/pull/17574#discussion_r867202050
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
Review Comment:
So the original generated code used "anchor" types of emitNative and
iterNative, and added additional methods for whatever types were generated.
Then the method appropriate to the factory function was assigned to the
internal function pointer, to minimize the generated per type boilerplate
somewhat. You can see this in the ["optimized"
package](https://github.com/apache/beam/blob/51067c123204a4951de3cba858edc96da354292b/sdks/go/pkg/beam/core/runtime/exec/optimized/emitters.go#L1072).
That package is quite legacy and handles the built ins, but ultimately would
only train users to assume the built ins are fast, and nothing else. All this
work allows fast for everyone, and lets us train users for it properly.
We don't need to do that approach anymore with generics. Instead return
`e.invoke` directly, instead of assigning it to an intermediate field. Then we
can also get rid of the `fn` field. Here and throughout
It shouldn't be worse either, since `interface{}` values would be copied by
value anyway.
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
Review Comment:
file name nit: Feel free to split this into 2 files, emitters.go and
iterators.go Don't repeat the package name in the file itself.
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative1WithTimestamp[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1WithTimestamp[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1,
val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2:
val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
Review Comment:
Please drop the Register prefix for all the functions here. As per your
example users will already have `registration` as a prefix.
`registration.Emitter1` is more readable.
Hmmm, it's also not too late to just call it the `register` package too...
but that's definitely a change for another PR.
I suspect this is hangover from when it was in the `beam` package.
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative1WithTimestamp[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1WithTimestamp[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1,
val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2:
val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative1[T1]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter2 registers parameters from your DoFn with a
+// signature func(T1, T2) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil))
+func RegisterEmitter2[T1, T2 any](e *func(T1, T2)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative2[T1, T2]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ if reflect.TypeOf(e).Elem().In(0) == typex.EventTimeType {
+ registerFunc = func(n exec.ElementProcessor)
exec.ReusableEmitter {
+ gen := &emitNative1WithTimestamp[T2]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter3 registers parameters from your DoFn with a
+// signature func(T1, T2, T3) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter3[T1, T2, T3]((*func(T1, T2, T3))(nil))
+func RegisterEmitter3[T1, T2, T3 any](e *func(T1, T2, T3)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative2WithTimestamp[T2, T3]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+type iterNative1[T any] struct {
+ s exec.ReStream
+ fn interface{}
+
+ // cur is the "current" stream, if any.
+ cur exec.Stream
+}
+
+func (v *iterNative1[T]) Init() error {
+ cur, err := v.s.Open()
+ if err != nil {
+ return err
+ }
+ v.cur = cur
+ return nil
+}
+
+func (v *iterNative1[T]) Value() interface{} {
+ return v.fn
+}
+
+func (v *iterNative1[T]) Reset() error {
+ if err := v.cur.Close(); err != nil {
+ return err
+ }
+ v.cur = nil
+ return nil
+}
+
+func (v *iterNative1[T]) invoke(value *T) bool {
+ elm, err := v.cur.Read()
+ if err != nil {
+ if err == io.EOF {
+ return false
+ }
+ panic(fmt.Sprintf("broken stream: %v", err))
+ }
+ *value = elm.Elm.(T)
+ return true
+}
+
+type iterNative2[T1, T2 any] struct {
+ s exec.ReStream
+ fn interface{}
+
+ // cur is the "current" stream, if any.
+ cur exec.Stream
+}
+
+func (v *iterNative2[T1, T2]) Init() error {
+ cur, err := v.s.Open()
+ if err != nil {
+ return err
+ }
+ v.cur = cur
+ return nil
+}
+
+func (v *iterNative2[T1, T2]) Value() interface{} {
+ return v.fn
+}
+
+func (v *iterNative2[T1, T2]) Reset() error {
+ if err := v.cur.Close(); err != nil {
+ return err
+ }
+ v.cur = nil
+ return nil
+}
+
+func (v *iterNative2[T1, T2]) invoke(key *T1, value *T2) bool {
+ elm, err := v.cur.Read()
+ if err != nil {
+ if err == io.EOF {
+ return false
+ }
+ panic(fmt.Sprintf("broken stream: %v", err))
+ }
+ *key = elm.Elm.(T1)
+ *value = elm.Elm2.(T2)
+ return true
+}
+
+// RegisterIter1 registers parameters from your DoFn with a
+// signature func(*T) bool and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterIter1[T]((*func(*T) bool)(nil))
+func RegisterIter1[T any](i *func(*T) bool) {
Review Comment:
We may consider adding something for Map Side Input functions `func(K)
func(*V) bool` here and "re iterators" `func() func(*V) bool` (and the KV
version).
That's not critical for this pass, since we don't have the registration
scaffolding for that set up, and I don't think there's a great need for it,
because those typically have high RPC overhead anyway as they fetch & refetch
the data.
We can always benchmark and add them later if needed.
The reflective versions are here:
https://github.com/apache/beam/blob/51067c123204a4951de3cba858edc96da354292b/sdks/go/pkg/beam/core/runtime/exec/input.go#L71
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative1WithTimestamp[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1WithTimestamp[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1,
val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2:
val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative1[T1]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter2 registers parameters from your DoFn with a
+// signature func(T1, T2) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil))
+func RegisterEmitter2[T1, T2 any](e *func(T1, T2)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative2[T1, T2]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ if reflect.TypeOf(e).Elem().In(0) == typex.EventTimeType {
+ registerFunc = func(n exec.ElementProcessor)
exec.ReusableEmitter {
+ gen := &emitNative1WithTimestamp[T2]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter3 registers parameters from your DoFn with a
+// signature func(T1, T2, T3) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter3[T1, T2, T3]((*func(T1, T2, T3))(nil))
+func RegisterEmitter3[T1, T2, T3 any](e *func(T1, T2, T3)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative2WithTimestamp[T2, T3]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+type iterNative1[T any] struct {
+ s exec.ReStream
+ fn interface{}
+
+ // cur is the "current" stream, if any.
+ cur exec.Stream
+}
+
+func (v *iterNative1[T]) Init() error {
+ cur, err := v.s.Open()
+ if err != nil {
+ return err
+ }
+ v.cur = cur
+ return nil
+}
+
+func (v *iterNative1[T]) Value() interface{} {
+ return v.fn
+}
+
+func (v *iterNative1[T]) Reset() error {
+ if err := v.cur.Close(); err != nil {
+ return err
+ }
+ v.cur = nil
+ return nil
+}
+
+func (v *iterNative1[T]) invoke(value *T) bool {
+ elm, err := v.cur.Read()
+ if err != nil {
+ if err == io.EOF {
+ return false
+ }
+ panic(fmt.Sprintf("broken stream: %v", err))
+ }
+ *value = elm.Elm.(T)
+ return true
+}
+
+type iterNative2[T1, T2 any] struct {
+ s exec.ReStream
+ fn interface{}
+
+ // cur is the "current" stream, if any.
+ cur exec.Stream
+}
+
+func (v *iterNative2[T1, T2]) Init() error {
+ cur, err := v.s.Open()
+ if err != nil {
+ return err
+ }
+ v.cur = cur
+ return nil
+}
+
+func (v *iterNative2[T1, T2]) Value() interface{} {
+ return v.fn
+}
+
+func (v *iterNative2[T1, T2]) Reset() error {
+ if err := v.cur.Close(); err != nil {
+ return err
+ }
+ v.cur = nil
+ return nil
+}
+
+func (v *iterNative2[T1, T2]) invoke(key *T1, value *T2) bool {
+ elm, err := v.cur.Read()
+ if err != nil {
+ if err == io.EOF {
+ return false
+ }
+ panic(fmt.Sprintf("broken stream: %v", err))
+ }
+ *key = elm.Elm.(T1)
+ *value = elm.Elm2.(T2)
+ return true
+}
+
+// RegisterIter1 registers parameters from your DoFn with a
+// signature func(*T) bool and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterIter1[T]((*func(*T) bool)(nil))
+func RegisterIter1[T any](i *func(*T) bool) {
+ registerFunc := func(s exec.ReStream) exec.ReusableInput {
+ ret := &iterNative1[T]{s: s}
+ ret.fn = ret.invoke
+ return ret
+ }
+ exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+}
+
+// RegisterIter1 registers parameters from your DoFn with a
+// signature func(*T1, *T2) bool and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterIter2[T1, T2]((*func(*T1, *T2) bool)(nil))
Review Comment:
What do you think about dropping the function type parameter?
1. Users aren't going to have an iterator/emitter around, so they'll need to
specify func pointer set up like you've got.
2. Even assuming type inference worked, it's still annoying to go through
the awkward (*blah)(nil) dance.
https://go.dev/play/p/quG3c5scpGQ
Inference does seem to work at least. We do get compile time safety with
this approach too, but it also prevents us from automatically supporting any
"new" iterator or emitter formats in the future, with the same registration
calls.
What do you think?
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2WithTimestamp[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative3[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative3[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative3[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative3[T1, T2]) invoke(et typex.EventTime, key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2:
val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative1[T1]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter2 registers parameters from your DoFn with a
+// signature func(T1, T2) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil))
Review Comment:
+1 to Danny's comment. It's not necessary to duplicate the analysis. Users
*could* do things that aren't useful to them, but in practice likely won't.
The big risk with these is that users change types and such, and end up not
registering new ones. But we can ultimately give them a clear non-failing
message for all the types their pipeline uses that are missing with code to
copy paste, for the perf boost.
Issue Time Tracking
-------------------
Worklog Id: (was: 767426)
Time Spent: 8h 40m (was: 8.5h)
> [Go SDK] Allow users to optimize DoFns with a single generic registration
> function
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14347
> URL: https://issues.apache.org/jira/browse/BEAM-14347
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 8h 40m
> Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator.
> This updates to allow them to use generics instead.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)