lostluck commented on code in PR #23285:
URL: https://github.com/apache/beam/pull/23285#discussion_r975537993


##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "google.golang.org/api/iterator"
+       "reflect"
+       "strings"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())

Review Comment:
   Please use the generic register package instead of the vanilla 
implementation: See the fhirio for an example use:
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/fhirio/read.go#L31
   
   ```
   register.DoFn3x1[context.Context, []byte, func(beam.X), 
error](*queryFn)(nil))
   register.Emitter1[beam.X]()
   register.DoFn3x1[context.Context, int, func(beam.X) bool, 
error](*writeFn)(nil))
   register.Iter[beam.X]()
   ```
   
   Note to self: update textio to use the generic registers....



##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "google.golang.org/api/iterator"
+       "reflect"
+       "strings"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+       var columns []string
+
+       for i := 0; i < t.NumField(); i++ {
+               columns = append(columns, t.Field(i).Tag.Get("spanner"))
+       }
+
+       return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type) 
beam.PCollection {
+       s = s.Scope("spanner.Read")
+
+       // TODO(herohde) 7/13/2017: using * is probably too inefficient. We 
could infer
+       // a focused query from the type.
+
+       cols := strings.Join(columnsFromStruct(t), ",")
+
+       return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols, 
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+}
+
+// Query executes a query. The output must have a schema compatible with the 
given
+// type, t. It returns a PCollection<t>.

Review Comment:
   Since this is a naive implementation it would be good to document that the 
query will be processed on a single worker, to set performance expectations of 
the query, and to recommend that a beam.Reshuffle be added after this if 
downstream splitting is required. If this changes to an SDF, we can remove the 
note.



##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "google.golang.org/api/iterator"
+       "reflect"
+       "strings"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+       var columns []string
+
+       for i := 0; i < t.NumField(); i++ {
+               columns = append(columns, t.Field(i).Tag.Get("spanner"))
+       }
+
+       return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type) 
beam.PCollection {
+       s = s.Scope("spanner.Read")
+
+       // TODO(herohde) 7/13/2017: using * is probably too inefficient. We 
could infer
+       // a focused query from the type.
+
+       cols := strings.Join(columnsFromStruct(t), ",")
+
+       return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols, 
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {

Review Comment:
   Either the QueryOptions struct should be unexported and all functional 
options that mutate it be defined in this package OR we simply have the 
QueryOptions struct that has all fields exported.
   
   It's an unclear user affordance otherwise, especially when there are no 
options currently available.



##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "google.golang.org/api/iterator"
+       "reflect"
+       "strings"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+       var columns []string
+
+       for i := 0; i < t.NumField(); i++ {
+               columns = append(columns, t.Field(i).Tag.Get("spanner"))
+       }
+
+       return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type) 
beam.PCollection {
+       s = s.Scope("spanner.Read")
+
+       // TODO(herohde) 7/13/2017: using * is probably too inefficient. We 
could infer
+       // a focused query from the type.
+
+       cols := strings.Join(columnsFromStruct(t), ",")
+
+       return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols, 
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+}
+
+// Query executes a query. The output must have a schema compatible with the 
given
+// type, t. It returns a PCollection<t>.
+func Query(s beam.Scope, database, q string, t reflect.Type, options 
...func(*QueryOptions) error) beam.PCollection {
+       s = s.Scope("spanner.Query")
+       return query(s, database, q, t, options...)
+}
+
+func query(s beam.Scope, database, query string, t reflect.Type, options 
...func(*QueryOptions) error) beam.PCollection {
+       queryOptions := QueryOptions{}
+       for _, opt := range options {
+               if err := opt(&queryOptions); err != nil {
+                       panic(err)
+               }
+       }
+
+       imp := beam.Impulse(s)
+       return beam.ParDo(s, &queryFn{Database: database, Query: query, Type: 
beam.EncodedType{T: t}, Options: queryOptions}, imp, beam.TypeDefinition{Var: 
beam.XType, T: t})
+}
+
+type queryFn struct {
+       // Database is the spanner connection string
+       Database string `json:"database"`
+       // Table is the table identifier.
+       Query string `json:"query"`
+       // Type is the encoded schema type.
+       Type beam.EncodedType `json:"type"`
+       // Options specifies additional query execution options.
+       Options QueryOptions `json:"options"`
+}
+
+func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit 
func(beam.X)) error {
+       client, err := spanner.NewClient(ctx, f.Database)
+       if err != nil {
+               return err
+       }
+       defer client.Close()
+
+       // todo: Use Batch Read
+
+       stmt := spanner.Statement{SQL: f.Query}
+       it := client.Single().Query(ctx, stmt)
+       defer it.Stop()
+
+       for {
+               val := reflect.New(f.Type.T).Interface() // val : *T
+               row, err := it.Next()
+               if err != nil {
+                       if err == iterator.Done {
+                               break
+                       }
+                       return err
+               }
+
+               if err := row.ToStruct(val); err != nil {
+                       return err
+               }
+
+               emit(reflect.ValueOf(val).Elem().Interface()) // emit(*val)
+       }
+       return nil
+}
+
+// Write writes the elements of the given PCollection<T> to spanner. T is 
required
+// to be the schema type.

Review Comment:
   Please add a similar note that writes occur against a single worker machine 
in the present implementation.



##########
sdks/go/pkg/beam/io/spannerio/spanner_test.go:
##########
@@ -0,0 +1,23 @@
+package spannerio
+
+import (
+       "github.com/stretchr/testify/assert"
+       "reflect"
+       "testing"
+)
+
+type TestDto struct {
+       One string `spanner:"one"`
+       Two int    `spanner:"two"`
+}
+
+func TestColumnsFromStructReturnsColumns(t *testing.T) {
+       // arrange
+       // act
+       cols := columnsFromStruct(reflect.TypeOf(TestDto{}))
+
+       // assert
+       assert.NotEmpty(t, cols)
+       assert.Equal(t, "one", cols[0])
+       assert.Equal(t, "two", cols[1])

Review Comment:
   Prefer using [cmp](https://pkg.go.dev/github.com/google/go-cmp/cmp) instead 
and not introducing an assert framework to the SDK.



##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "google.golang.org/api/iterator"
+       "reflect"
+       "strings"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+       var columns []string
+
+       for i := 0; i < t.NumField(); i++ {
+               columns = append(columns, t.Field(i).Tag.Get("spanner"))
+       }
+
+       return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type) 
beam.PCollection {
+       s = s.Scope("spanner.Read")
+
+       // TODO(herohde) 7/13/2017: using * is probably too inefficient. We 
could infer
+       // a focused query from the type.
+
+       cols := strings.Join(columnsFromStruct(t), ",")
+
+       return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols, 
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+}
+
+// Query executes a query. The output must have a schema compatible with the 
given
+// type, t. It returns a PCollection<t>.
+func Query(s beam.Scope, database, q string, t reflect.Type, options 
...func(*QueryOptions) error) beam.PCollection {
+       s = s.Scope("spanner.Query")
+       return query(s, database, q, t, options...)
+}
+
+func query(s beam.Scope, database, query string, t reflect.Type, options 
...func(*QueryOptions) error) beam.PCollection {
+       queryOptions := QueryOptions{}
+       for _, opt := range options {
+               if err := opt(&queryOptions); err != nil {
+                       panic(err)
+               }
+       }
+
+       imp := beam.Impulse(s)
+       return beam.ParDo(s, &queryFn{Database: database, Query: query, Type: 
beam.EncodedType{T: t}, Options: queryOptions}, imp, beam.TypeDefinition{Var: 
beam.XType, T: t})
+}
+
+type queryFn struct {
+       // Database is the spanner connection string
+       Database string `json:"database"`
+       // Table is the table identifier.
+       Query string `json:"query"`
+       // Type is the encoded schema type.
+       Type beam.EncodedType `json:"type"`
+       // Options specifies additional query execution options.
+       Options QueryOptions `json:"options"`
+}
+
+func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit 
func(beam.X)) error {
+       client, err := spanner.NewClient(ctx, f.Database)
+       if err != nil {
+               return err
+       }
+       defer client.Close()
+
+       // todo: Use Batch Read
+
+       stmt := spanner.Statement{SQL: f.Query}
+       it := client.Single().Query(ctx, stmt)
+       defer it.Stop()
+
+       for {
+               val := reflect.New(f.Type.T).Interface() // val : *T
+               row, err := it.Next()
+               if err != nil {
+                       if err == iterator.Done {
+                               break
+                       }
+                       return err
+               }
+
+               if err := row.ToStruct(val); err != nil {
+                       return err
+               }
+
+               emit(reflect.ValueOf(val).Elem().Interface()) // emit(*val)
+       }
+       return nil
+}
+
+// Write writes the elements of the given PCollection<T> to spanner. T is 
required
+// to be the schema type.
+func Write(s beam.Scope, database, table string, col beam.PCollection) {
+       t := col.Type().Type()

Review Comment:
   Consider adding validation that it's not a KV or CoGBK type (panicking with 
an unsupported message in those cases), to avoid certain construction time 
issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to