riteshghorse commented on code in PR #22826:
URL: https://github.com/apache/beam/pull/22826#discussion_r953980747
##########
sdks/go/pkg/beam/core/state/state.go:
##########
@@ -209,3 +224,147 @@ func MakeBagState[T any](k string) Bag[T] {
Key: k,
}
}
+
+// Combining is used to read and write global pipeline state representing a
single combined value.
+// It uses 3 generic values, [T1, T2, T3], to represent the accumulator,
input, and output types respectively.
+// Key represents the key used to lookup this state.
+type Combining[T1, T2, T3 any] struct {
+ Key string
+ accumFn interface{}
+}
+
+// Add is used to write add an element to the combining pipeline state.
+func (s *Combining[T1, T2, T3]) Add(p Provider, val T2) error {
+ // We will always maintain a single accumulated value as a value state.
+ // Therefore, when we add we must first read the current accumulator so
that we can add to it.
+ acc, ok, err := s.readAccumulator(p)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ // If no accumulator, that means that the CreateAccumulator
function doesn't exist
+ // and our value is our initial accumulator.
+ return p.WriteValueState(Transaction{
+ Key: s.Key,
+ Type: TransactionTypeSet,
+ Val: val,
+ })
+ }
+
+ if ai := p.AddInputFn(s.Key); ai != nil {
+ var newVal interface{}
+ if f, ok := ai.(reflectx.Func2x1); ok {
+ newVal = f.Call2x1(acc, val)
+ } else {
+ newVal = f.Call([]interface{}{acc, val})[0]
+ }
+ return p.WriteValueState(Transaction{
+ Key: s.Key,
+ Type: TransactionTypeSet,
+ Val: newVal,
+ })
+ }
+ // If AddInput isn't defined, that means we must just have one
accumulator type identical to the input type.
+ if ma := p.MergeAccumulatorsFn(s.Key); ma != nil {
+ var newVal interface{}
+ if f, ok := ma.(reflectx.Func2x1); ok {
+ newVal = f.Call2x1(acc, val)
+ } else {
+ newVal = f.Call([]interface{}{acc, val})[0]
+ }
+ return p.WriteValueState(Transaction{
+ Key: s.Key,
+ Type: TransactionTypeSet,
+ Val: newVal,
+ })
+ }
+
+ // Should be taken care of by previous validation
+ panic(fmt.Sprintf("MergeAccumulators must be defined on accumulator
%v", s))
+}
+
+// Read is used to read this instance of global pipeline state representing a
combiner.
+// When a value is not found, returns an empty list and false.
+func (s *Combining[T1, T2, T3]) Read(p Provider) (T3, bool, error) {
+ acc, ok, err := s.readAccumulator(p)
+ if !ok || err != nil {
+ var val T3
+ return val, ok, err
Review Comment:
since this function returns an empty list, do we expect users to specify
type as `[]int` or just `int` and return `[]T3` from here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]