capthiron commented on code in PR #23411:
URL: https://github.com/apache/beam/pull/23411#discussion_r1008315339


##########
sdks/go/pkg/beam/io/bigtableio/bigtable.go:
##########
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package bigtableio provides transformations and utilities to interact with
+// Google Bigtable. See also: https://cloud.google.com/bigtable/docs
+package bigtableio
+
+import (
+       "context"
+       "fmt"
+       "hash/fnv"
+       "reflect"
+
+       "cloud.google.com/go/bigtable"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn2x1[int, func(*Mutation) bool, error](&writeFn{})
+       register.Iter1[*Mutation]()
+       register.DoFn2x1[int, func(*Mutation) bool, error](&writeBatchFn{})
+       register.Iter1[*Mutation]()
+}
+
+// Mutation represents a necessary serializable wrapper analogue 
+// to bigtable.Mutation containing a rowKey and the operations to be applied.
+type Mutation struct {
+       RowKey   string
+       Ops []Operation
+
+       // optional custom beam.GroupByKey key, default is a fixed key of 1.
+       GroupKey string
+}
+
+// Operation represents a raw change to be applied within a Mutation.
+type Operation struct {
+       Family string
+       Column string
+       Ts bigtable.Timestamp
+       Value []byte
+}
+
+// NewMutation returns a new *Mutation, analogue to bigtable.NewMutation().
+func NewMutation(rowKey string) *Mutation {
+       return &Mutation{RowKey: rowKey}
+}
+
+// Set sets a value in a specified column, with the given timestamp,
+// analogue to bigtable.Mutation.Set().
+// The timestamp will be truncated to millisecond granularity.
+// A timestamp of ServerTime means to use the server timestamp.
+func (m *Mutation) Set(family, column string, ts bigtable.Timestamp, value 
[]byte) {
+       m.Ops = append(m.Ops, Operation{Family: family, Column: column, Ts: ts, 
Value: value})
+}
+
+// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey.
+func (m *Mutation) WithGroupKey(key string) *Mutation {
+       m.GroupKey = key
+       return m
+}
+
+// Write writes the elements of the given PCollection<bigtableio.Mutation> to 
bigtable.
+func Write(s beam.Scope, project, instanceID, table string, col 
beam.PCollection) {
+       t := col.Type().Type()
+       err := mustBeBigtableioMutation(t)
+       if err != nil {
+               panic(err)
+       }
+
+       s = s.Scope("bigtable.Write")
+
+       pre := beam.ParDo(s, addGroupKeyFn, col)
+       post := beam.GroupByKey(s, pre)
+       beam.ParDo0(s, &writeFn{Project: project, InstanceID: instanceID, 
TableName: table, Type: beam.EncodedType{T: t}}, post)
+}
+
+// WriteBatch writes the elements of the given 
PCollection<bigtableio.Mutation> 
+// to bigtable using bigtable.ApplyBulk().
+// For the underlying bigtable.ApplyBulk function to work properly 
+// the maximum number of operations per bigtableio.Mutation of the input
+// PCollection must be given (maxOpsPerMutation). This is necessary due to 
+// the maximum amount of mutations allowed per bulk operation (100,000),
+// see https://cloud.google.com/bigtable/docs/writes#batch for more.
+func WriteBatch(s beam.Scope, project, instanceID, table string, 
maxOpsPerMutation uint, col beam.PCollection) {
+       t := col.Type().Type()
+       err := mustBeBigtableioMutation(t)
+       if err != nil {
+               panic(err)
+       }
+
+       if maxOpsPerMutation == 0 {
+               panic("maxOpsPerMutation must not be 0")
+       }
+
+       if maxOpsPerMutation > 100000 {
+               panic("maxOpsPerMutation must not be greater than 100,000, see 
https://cloud.google.com/bigtable/docs/writes#batch";)
+       }
+
+       s = s.Scope("bigtable.WriteBatch")
+
+       pre := beam.ParDo(s, addGroupKeyFn, col)
+       post := beam.GroupByKey(s, pre)
+       beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID, 
TableName: table, MaxOpsPerMutation: maxOpsPerMutation, Type: 
beam.EncodedType{T: t}}, post)
+}
+
+func addGroupKeyFn(mutation Mutation) (int, Mutation) {
+       if mutation.GroupKey != "" {
+               return hashStringToInt(mutation.GroupKey), mutation
+       }
+       return 1, mutation
+}
+
+func hashStringToInt(s string) int {
+       h := fnv.New32a()
+       h.Write([]byte(s))
+       return int(h.Sum32())
+}
+
+func mustBeBigtableioMutation(t reflect.Type) error {
+       if t != reflect.TypeOf(Mutation{}) {
+               return fmt.Errorf("type must be bigtableio.Mutation but is: 
%v", t)
+       }
+       return nil
+}
+
+type writeFn struct {
+       // Project is the project
+       Project string `json:"project"`
+       // InstanceID is the bigtable instanceID
+       InstanceID string `json:"instanceId"`
+       // Client is the bigtable.Client
+       Client *bigtable.Client `json:"client"`
+       // TableName is the qualified table identifier.
+       TableName string `json:"tableName"`
+       // Table is a bigtable.Table instance with an eventual open connection
+       Table *bigtable.Table `json:"table"`
+       // Type is the encoded schema type.
+       Type beam.EncodedType `json:"type"`
+}
+
+func (f *writeFn) StartBundle() error {
+       var err error
+       f.Client, err = bigtable.NewClient(context.Background(), f.Project, 
f.InstanceID)
+       if err != nil {
+               return fmt.Errorf("could not create data operations client: 
%v", err)
+       }
+
+       f.Table = f.Client.Open(f.TableName)
+       return nil
+}
+
+func (f *writeFn) FinishBundle() error {
+       if err := f.Client.Close(); err != nil {
+               return fmt.Errorf("could not close data operations client: %v", 
err)
+       }
+       return nil
+}
+
+func (f *writeFn) ProcessElement(key int, values func(*Mutation) bool) error {
+
+       var mutation Mutation
+       for values(&mutation) {
+               
+               err := f.Table.Apply(context.Background(), mutation.RowKey, 
getBigtableMutation(mutation))
+               if err != nil {
+                       return fmt.Errorf("could not apply mutation for row 
key='%s': %v", mutation.RowKey, err)
+               }
+
+       }
+
+       return nil
+}
+
+type writeBatchFn struct {
+       // Project is the project
+       Project string `json:"project"`
+       // InstanceID is the bigtable instanceID
+       InstanceID string `json:"instanceId"`
+       // Client is the bigtable.Client
+       Client *bigtable.Client `json:"client"`
+       // TableName is the qualified table identifier.
+       TableName string `json:"tableName"`
+       // Table is a bigtable.Table instance with an eventual open connection
+       Table *bigtable.Table `json:"table"`
+       // MaxOpsPerMutation indicates how many operations are contained within 
a bigtableio.Mutation at max
+       MaxOpsPerMutation uint `json:"maxOpsPerMutation"`
+       // Type is the encoded schema type.
+       Type beam.EncodedType `json:"type"`
+}
+
+func (f *writeBatchFn) StartBundle() error {
+       var err error
+       f.Client, err = bigtable.NewClient(context.Background(), f.Project, 
f.InstanceID)
+       if err != nil {
+               return fmt.Errorf("could not create data operations client: 
%v", err)
+       }
+
+       f.Table = f.Client.Open(f.TableName)
+       return nil
+}
+
+func (f *writeBatchFn) FinishBundle() error {
+       if err := f.Client.Close(); err != nil {
+               return fmt.Errorf("could not close data operations client: %v", 
err)
+       }
+       return nil
+}
+
+func (f *writeBatchFn) ProcessElement(key int, values func(*Mutation) bool) 
error {
+       maxMutationsPerBatch := 100000 / int(f.MaxOpsPerMutation)
+
+       var rowKeys []string
+       var mutations []*bigtable.Mutation
+
+       mutationsAdded := 0
+
+       var mutation Mutation
+       for values(&mutation) {
+
+               rowKeys = append(rowKeys, mutation.RowKey)
+               mutations = append(mutations, getBigtableMutation(mutation))
+               mutationsAdded += int(f.MaxOpsPerMutation)
+
+               if (mutationsAdded + int(f.MaxOpsPerMutation)) > 
maxMutationsPerBatch {
+                       err := 
tryApplyBulk(f.Table.ApplyBulk(context.Background(), rowKeys, mutations))

Review Comment:
   Refactored this bit to have context.Context as parameter of ProcessElement.
   Thanks for this hint :))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to