[
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=767616&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767616
]
ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/May/22 21:38
Start Date: 07/May/22 21:38
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17574:
URL: https://github.com/apache/beam/pull/17574#discussion_r867397465
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative1WithTimestamp[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1WithTimestamp[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1,
val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2:
val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative1[T1]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter2 registers parameters from your DoFn with a
+// signature func(T1, T2) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil))
+func RegisterEmitter2[T1, T2 any](e *func(T1, T2)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative2[T1, T2]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ if reflect.TypeOf(e).Elem().In(0) == typex.EventTimeType {
+ registerFunc = func(n exec.ElementProcessor)
exec.ReusableEmitter {
+ gen := &emitNative1WithTimestamp[T2]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter3 registers parameters from your DoFn with a
+// signature func(T1, T2, T3) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter3[T1, T2, T3]((*func(T1, T2, T3))(nil))
+func RegisterEmitter3[T1, T2, T3 any](e *func(T1, T2, T3)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative2WithTimestamp[T2, T3]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+type iterNative1[T any] struct {
+ s exec.ReStream
+ fn interface{}
+
+ // cur is the "current" stream, if any.
+ cur exec.Stream
+}
+
+func (v *iterNative1[T]) Init() error {
+ cur, err := v.s.Open()
+ if err != nil {
+ return err
+ }
+ v.cur = cur
+ return nil
+}
+
+func (v *iterNative1[T]) Value() interface{} {
+ return v.fn
+}
+
+func (v *iterNative1[T]) Reset() error {
+ if err := v.cur.Close(); err != nil {
+ return err
+ }
+ v.cur = nil
+ return nil
+}
+
+func (v *iterNative1[T]) invoke(value *T) bool {
+ elm, err := v.cur.Read()
+ if err != nil {
+ if err == io.EOF {
+ return false
+ }
+ panic(fmt.Sprintf("broken stream: %v", err))
+ }
+ *value = elm.Elm.(T)
+ return true
+}
+
+type iterNative2[T1, T2 any] struct {
+ s exec.ReStream
+ fn interface{}
+
+ // cur is the "current" stream, if any.
+ cur exec.Stream
+}
+
+func (v *iterNative2[T1, T2]) Init() error {
+ cur, err := v.s.Open()
+ if err != nil {
+ return err
+ }
+ v.cur = cur
+ return nil
+}
+
+func (v *iterNative2[T1, T2]) Value() interface{} {
+ return v.fn
+}
+
+func (v *iterNative2[T1, T2]) Reset() error {
+ if err := v.cur.Close(); err != nil {
+ return err
+ }
+ v.cur = nil
+ return nil
+}
+
+func (v *iterNative2[T1, T2]) invoke(key *T1, value *T2) bool {
+ elm, err := v.cur.Read()
+ if err != nil {
+ if err == io.EOF {
+ return false
+ }
+ panic(fmt.Sprintf("broken stream: %v", err))
+ }
+ *key = elm.Elm.(T1)
+ *value = elm.Elm2.(T2)
+ return true
+}
+
+// RegisterIter1 registers parameters from your DoFn with a
+// signature func(*T) bool and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterIter1[T]((*func(*T) bool)(nil))
+func RegisterIter1[T any](i *func(*T) bool) {
Review Comment:
Yeah, I think its a good idea to do that eventually, I'd scope that out of
this PR though. It shouldn't be too hard to build off of this one once we have
a pattern established though.
Issue Time Tracking
-------------------
Worklog Id: (was: 767616)
Time Spent: 9h (was: 8h 50m)
> [Go SDK] Allow users to optimize DoFns with a single generic registration
> function
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14347
> URL: https://issues.apache.org/jira/browse/BEAM-14347
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 9h
> Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator.
> This updates to allow them to use generics instead.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)