lostluck commented on code in PR #23285:
URL: https://github.com/apache/beam/pull/23285#discussion_r975537993
##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+ "cloud.google.com/go/spanner"
+ "context"
+ "fmt"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "google.golang.org/api/iterator"
+ "reflect"
+ "strings"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
Review Comment:
Please use the generic register package instead of the vanilla
implementation: See the fhirio for an example use:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/fhirio/read.go#L31
```
register.DoFn3x1[context.Context, []byte, func(beam.X),
error](*queryFn)(nil))
register.Emitter1[beam.X]()
register.DoFn3x1[context.Context, int, func(beam.X) bool,
error](*writeFn)(nil))
register.Iter[beam.X]()
```
Note to self: update textio to use the generic registers....
##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+ "cloud.google.com/go/spanner"
+ "context"
+ "fmt"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "google.golang.org/api/iterator"
+ "reflect"
+ "strings"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+ var columns []string
+
+ for i := 0; i < t.NumField(); i++ {
+ columns = append(columns, t.Field(i).Tag.Get("spanner"))
+ }
+
+ return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type)
beam.PCollection {
+ s = s.Scope("spanner.Read")
+
+ // TODO(herohde) 7/13/2017: using * is probably too inefficient. We
could infer
+ // a focused query from the type.
+
+ cols := strings.Join(columnsFromStruct(t), ",")
+
+ return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols,
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+}
+
+// Query executes a query. The output must have a schema compatible with the
given
+// type, t. It returns a PCollection<t>.
Review Comment:
Since this is a naive implementation it would be good to document that the
query will be processed on a single worker, to set performance expectations of
the query, and to recommend that a beam.Reshuffle be added after this if
downstream splitting is required. If this changes to an SDF, we can remove the
note.
##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+ "cloud.google.com/go/spanner"
+ "context"
+ "fmt"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "google.golang.org/api/iterator"
+ "reflect"
+ "strings"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+ var columns []string
+
+ for i := 0; i < t.NumField(); i++ {
+ columns = append(columns, t.Field(i).Tag.Get("spanner"))
+ }
+
+ return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type)
beam.PCollection {
+ s = s.Scope("spanner.Read")
+
+ // TODO(herohde) 7/13/2017: using * is probably too inefficient. We
could infer
+ // a focused query from the type.
+
+ cols := strings.Join(columnsFromStruct(t), ",")
+
+ return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols,
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
Review Comment:
Either the QueryOptions struct should be unexported and all functional
options that mutate it be defined in this package OR we simply have the
QueryOptions struct that has all fields exported.
It's an unclear user affordance otherwise, especially when there are no
options currently available.
##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+ "cloud.google.com/go/spanner"
+ "context"
+ "fmt"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "google.golang.org/api/iterator"
+ "reflect"
+ "strings"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+ var columns []string
+
+ for i := 0; i < t.NumField(); i++ {
+ columns = append(columns, t.Field(i).Tag.Get("spanner"))
+ }
+
+ return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type)
beam.PCollection {
+ s = s.Scope("spanner.Read")
+
+ // TODO(herohde) 7/13/2017: using * is probably too inefficient. We
could infer
+ // a focused query from the type.
+
+ cols := strings.Join(columnsFromStruct(t), ",")
+
+ return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols,
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+}
+
+// Query executes a query. The output must have a schema compatible with the
given
+// type, t. It returns a PCollection<t>.
+func Query(s beam.Scope, database, q string, t reflect.Type, options
...func(*QueryOptions) error) beam.PCollection {
+ s = s.Scope("spanner.Query")
+ return query(s, database, q, t, options...)
+}
+
+func query(s beam.Scope, database, query string, t reflect.Type, options
...func(*QueryOptions) error) beam.PCollection {
+ queryOptions := QueryOptions{}
+ for _, opt := range options {
+ if err := opt(&queryOptions); err != nil {
+ panic(err)
+ }
+ }
+
+ imp := beam.Impulse(s)
+ return beam.ParDo(s, &queryFn{Database: database, Query: query, Type:
beam.EncodedType{T: t}, Options: queryOptions}, imp, beam.TypeDefinition{Var:
beam.XType, T: t})
+}
+
+type queryFn struct {
+ // Database is the spanner connection string
+ Database string `json:"database"`
+ // Table is the table identifier.
+ Query string `json:"query"`
+ // Type is the encoded schema type.
+ Type beam.EncodedType `json:"type"`
+ // Options specifies additional query execution options.
+ Options QueryOptions `json:"options"`
+}
+
+func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit
func(beam.X)) error {
+ client, err := spanner.NewClient(ctx, f.Database)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ // todo: Use Batch Read
+
+ stmt := spanner.Statement{SQL: f.Query}
+ it := client.Single().Query(ctx, stmt)
+ defer it.Stop()
+
+ for {
+ val := reflect.New(f.Type.T).Interface() // val : *T
+ row, err := it.Next()
+ if err != nil {
+ if err == iterator.Done {
+ break
+ }
+ return err
+ }
+
+ if err := row.ToStruct(val); err != nil {
+ return err
+ }
+
+ emit(reflect.ValueOf(val).Elem().Interface()) // emit(*val)
+ }
+ return nil
+}
+
+// Write writes the elements of the given PCollection<T> to spanner. T is
required
+// to be the schema type.
Review Comment:
Please add a similar note that writes occur against a single worker machine
in the present implementation.
##########
sdks/go/pkg/beam/io/spannerio/spanner_test.go:
##########
@@ -0,0 +1,23 @@
+package spannerio
+
+import (
+ "github.com/stretchr/testify/assert"
+ "reflect"
+ "testing"
+)
+
+type TestDto struct {
+ One string `spanner:"one"`
+ Two int `spanner:"two"`
+}
+
+func TestColumnsFromStructReturnsColumns(t *testing.T) {
+ // arrange
+ // act
+ cols := columnsFromStruct(reflect.TypeOf(TestDto{}))
+
+ // assert
+ assert.NotEmpty(t, cols)
+ assert.Equal(t, "one", cols[0])
+ assert.Equal(t, "two", cols[1])
Review Comment:
Prefer using [cmp](https://pkg.go.dev/github.com/google/go-cmp/cmp) instead
and not introducing an assert framework to the SDK.
##########
sdks/go/pkg/beam/io/spannerio/spanner.go:
##########
@@ -0,0 +1,163 @@
+package spannerio
+
+import (
+ "cloud.google.com/go/spanner"
+ "context"
+ "fmt"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "google.golang.org/api/iterator"
+ "reflect"
+ "strings"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
+}
+
+func columnsFromStruct(t reflect.Type) []string {
+ var columns []string
+
+ for i := 0; i < t.NumField(); i++ {
+ columns = append(columns, t.Field(i).Tag.Get("spanner"))
+ }
+
+ return columns
+}
+
+// Read reads all rows from the given table. The table must have a schema
+// compatible with the given type, t, and Read returns a PCollection<t>. If the
+// table has more rows than t, then Read is implicitly a projection.
+func Read(s beam.Scope, database, table string, t reflect.Type)
beam.PCollection {
+ s = s.Scope("spanner.Read")
+
+ // TODO(herohde) 7/13/2017: using * is probably too inefficient. We
could infer
+ // a focused query from the type.
+
+ cols := strings.Join(columnsFromStruct(t), ",")
+
+ return query(s, database, fmt.Sprintf("SELECT %v from [%v]", cols,
table), t)
+}
+
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+}
+
+// Query executes a query. The output must have a schema compatible with the
given
+// type, t. It returns a PCollection<t>.
+func Query(s beam.Scope, database, q string, t reflect.Type, options
...func(*QueryOptions) error) beam.PCollection {
+ s = s.Scope("spanner.Query")
+ return query(s, database, q, t, options...)
+}
+
+func query(s beam.Scope, database, query string, t reflect.Type, options
...func(*QueryOptions) error) beam.PCollection {
+ queryOptions := QueryOptions{}
+ for _, opt := range options {
+ if err := opt(&queryOptions); err != nil {
+ panic(err)
+ }
+ }
+
+ imp := beam.Impulse(s)
+ return beam.ParDo(s, &queryFn{Database: database, Query: query, Type:
beam.EncodedType{T: t}, Options: queryOptions}, imp, beam.TypeDefinition{Var:
beam.XType, T: t})
+}
+
+type queryFn struct {
+ // Database is the spanner connection string
+ Database string `json:"database"`
+ // Table is the table identifier.
+ Query string `json:"query"`
+ // Type is the encoded schema type.
+ Type beam.EncodedType `json:"type"`
+ // Options specifies additional query execution options.
+ Options QueryOptions `json:"options"`
+}
+
+func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit
func(beam.X)) error {
+ client, err := spanner.NewClient(ctx, f.Database)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ // todo: Use Batch Read
+
+ stmt := spanner.Statement{SQL: f.Query}
+ it := client.Single().Query(ctx, stmt)
+ defer it.Stop()
+
+ for {
+ val := reflect.New(f.Type.T).Interface() // val : *T
+ row, err := it.Next()
+ if err != nil {
+ if err == iterator.Done {
+ break
+ }
+ return err
+ }
+
+ if err := row.ToStruct(val); err != nil {
+ return err
+ }
+
+ emit(reflect.ValueOf(val).Elem().Interface()) // emit(*val)
+ }
+ return nil
+}
+
+// Write writes the elements of the given PCollection<T> to spanner. T is
required
+// to be the schema type.
+func Write(s beam.Scope, database, table string, col beam.PCollection) {
+ t := col.Type().Type()
Review Comment:
Consider adding validation that it's not a KV or CoGBK type (panicking with
an unsupported message in those cases), to avoid certain construction time
issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]