This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aa1d7126499 interface{} -> any for remaining references (#24625) aa1d7126499 is described below commit aa1d7126499c897bc7881606ea2481b18f7be3ba Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Fri Dec 9 14:57:47 2022 -0500 interface{} -> any for remaining references (#24625) * interface{} -> any for rest of pkg/beam * convert rest of folder --- sdks/go/cmd/specialize/main.go | 8 +++--- sdks/go/cmd/symtab/main.go | 2 +- .../nativepubsubio/subscriptiontracker.go | 6 ++-- sdks/go/examples/snippets/01_03intro.go | 8 +++--- sdks/go/examples/snippets/06schemas.go | 8 +++--- sdks/go/examples/snippets/06schemas_test.go | 2 +- sdks/go/examples/snippets/12splittabledofns.go | 2 +- sdks/go/pkg/beam/coder.go | 8 +++--- sdks/go/pkg/beam/coder_test.go | 6 ++-- sdks/go/pkg/beam/combine.go | 8 +++--- sdks/go/pkg/beam/create.go | 12 ++++---- sdks/go/pkg/beam/create_test.go | 24 ++++++++-------- sdks/go/pkg/beam/encoding.go | 32 +++++++++++----------- sdks/go/pkg/beam/example_schema_test.go | 12 ++++---- sdks/go/pkg/beam/forward.go | 6 ++-- sdks/go/pkg/beam/option.go | 4 +-- sdks/go/pkg/beam/options/jobopts/stringSlice.go | 2 +- sdks/go/pkg/beam/pardo.go | 22 +++++++-------- sdks/go/pkg/beam/partition.go | 14 +++++----- sdks/go/pkg/beam/partition_test.go | 8 +++--- sdks/go/pkg/beam/provision/provision.go | 4 +-- .../beam/runners/dataflow/dataflowlib/messages.go | 20 +++++++------- .../beam/runners/dataflow/dataflowlib/metrics.go | 6 ++-- .../runners/dataflow/dataflowlib/metrics_test.go | 2 +- sdks/go/pkg/beam/runners/direct/buffer.go | 2 +- sdks/go/pkg/beam/runners/vet/vet.go | 6 ++-- sdks/go/pkg/beam/schema.go | 8 +++--- .../go/pkg/beam/transforms/filter/distinct_test.go | 28 +++++++++---------- sdks/go/pkg/beam/transforms/filter/filter.go | 4 +-- sdks/go/pkg/beam/transforms/filter/filter_test.go | 4 +-- sdks/go/pkg/beam/transforms/stats/max_switch.go | 2 +- sdks/go/pkg/beam/transforms/stats/max_switch.tmpl | 2 +- sdks/go/pkg/beam/transforms/stats/mean.go | 2 +- sdks/go/pkg/beam/transforms/stats/min_switch.go | 2 +- sdks/go/pkg/beam/transforms/stats/min_switch.tmpl | 2 +- sdks/go/pkg/beam/transforms/stats/quantiles.go | 22 +++++++-------- .../go/pkg/beam/transforms/stats/quantiles_test.go | 6 ++-- sdks/go/pkg/beam/transforms/stats/sum_switch.go | 2 +- sdks/go/pkg/beam/transforms/stats/sum_switch.tmpl | 2 +- sdks/go/pkg/beam/transforms/stats/util.go | 4 +-- sdks/go/pkg/beam/transforms/top/top.go | 16 +++++------ .../pkg/beam/transforms/xlang/python/external.go | 8 +++--- sdks/go/pkg/beam/util.go | 2 +- sdks/go/pkg/beam/xlang.go | 2 +- sdks/go/test/integration/flags.go | 2 +- .../integration/io/xlang/bigquery/bigquery_test.go | 9 +++--- sdks/go/test/integration/primitives/windowinto.go | 6 ++-- .../go/test/regression/coders/fromyaml/fromyaml.go | 22 +++++++-------- sdks/go/test/regression/pardo.go | 4 +-- 49 files changed, 198 insertions(+), 197 deletions(-) diff --git a/sdks/go/cmd/specialize/main.go b/sdks/go/cmd/specialize/main.go index de23a62bdd5..009340fd29d 100644 --- a/sdks/go/cmd/specialize/main.go +++ b/sdks/go/cmd/specialize/main.go @@ -195,7 +195,7 @@ func makeName(t string) string { // Useful template functions -var funcMap template.FuncMap = map[string]interface{}{ +var funcMap template.FuncMap = map[string]any{ "join": strings.Join, "upto": upto, "mkargs": mkargs, @@ -253,8 +253,8 @@ func mult(i int, j int) int { return i * j } -func dict(values ...interface{}) map[string]interface{} { - dict := make(map[string]interface{}, len(values)/2) +func dict(values ...any) map[string]any { + dict := make(map[string]any, len(values)/2) if len(values)%2 != 0 { panic("Invalid dictionary call") } @@ -302,7 +302,7 @@ func genericTypingRepresentation(in int, out int, includeType bool) string { return typing } -func possibleBundleLifecycleParameterCombos(numInInterface interface{}, processElementInInterface interface{}) [][]string { +func possibleBundleLifecycleParameterCombos(numInInterface any, processElementInInterface any) [][]string { numIn := numInInterface.(int) processElementIn := processElementInInterface.(int) orderedKnownParameterOptions := []string{"context.Context", "typex.PaneInfo", "[]typex.Window", "typex.EventTime", "typex.BundleFinalization"} diff --git a/sdks/go/cmd/symtab/main.go b/sdks/go/cmd/symtab/main.go index c9df1f0ad60..6628cc8e439 100644 --- a/sdks/go/cmd/symtab/main.go +++ b/sdks/go/cmd/symtab/main.go @@ -77,7 +77,7 @@ func main() { log.Fatalf("error creating function out of address") return } - ret.Fn.Call([]interface{}{arg}) + ret.Fn.Call([]any{arg}) // Checks that function was executed. if counter != 1 { diff --git a/sdks/go/examples/native_wordcap/nativepubsubio/subscriptiontracker.go b/sdks/go/examples/native_wordcap/nativepubsubio/subscriptiontracker.go index a302ff39fb4..5001f5b8f12 100644 --- a/sdks/go/examples/native_wordcap/nativepubsubio/subscriptiontracker.go +++ b/sdks/go/examples/native_wordcap/nativepubsubio/subscriptiontracker.go @@ -30,7 +30,7 @@ func NewSubscriptionRTracker(entry string) *SubscriptionRTracker { // TryClaim returns true iff the given position is a string and matches the underlying // subscription ID. -func (s *SubscriptionRTracker) TryClaim(pos interface{}) bool { +func (s *SubscriptionRTracker) TryClaim(pos any) bool { posString, ok := pos.(string) return ok && posString == s.Subscription } @@ -38,7 +38,7 @@ func (s *SubscriptionRTracker) TryClaim(pos interface{}) bool { // TrySplit is a no-op for the StaticRTracker in the normal case and moves the subscription // to the residual in the checkpointing case, marking itself as done to keep the logical checks // around SDF data loss happy. -func (s *SubscriptionRTracker) TrySplit(frac float64) (primary, residual interface{}, err error) { +func (s *SubscriptionRTracker) TrySplit(frac float64) (primary, residual any, err error) { if frac == 0.0 { resid := s.Subscription s.Subscription = "" @@ -73,6 +73,6 @@ func (s *SubscriptionRTracker) IsBounded() bool { } // GetRestriction returns the name of the subscription. -func (s *SubscriptionRTracker) GetRestriction() interface{} { +func (s *SubscriptionRTracker) GetRestriction() any { return s.Subscription } diff --git a/sdks/go/examples/snippets/01_03intro.go b/sdks/go/examples/snippets/01_03intro.go index 38d6fe4ec24..e3754fc9e5c 100644 --- a/sdks/go/examples/snippets/01_03intro.go +++ b/sdks/go/examples/snippets/01_03intro.go @@ -50,7 +50,7 @@ func PipelineConstruction() { // [END pipelines_constructing_reading] - _ = []interface{}{pipeline, scope, lines} + _ = []any{pipeline, scope, lines} } // Create demonstrates using beam.CreateList. @@ -73,7 +73,7 @@ func Create() { // to the pipeline. linesPCol := beam.CreateList(s, lines) // [END model_pcollection] - _ = []interface{}{p, linesPCol} + _ = []any{p, linesPCol} } // PipelineOptions shows basic pipeline options using flags. @@ -86,7 +86,7 @@ func PipelineOptions() { ) // [END pipeline_options_define_custom] - _ = []interface{}{input, output} + _ = []any{input, output} } // PipelineOptionsCustom shows slightly less basic pipeline options using flags. @@ -98,5 +98,5 @@ func PipelineOptionsCustom() { ) // [END pipeline_options_define_custom_with_help_and_default] - _ = []interface{}{input, output} + _ = []any{input, output} } diff --git a/sdks/go/examples/snippets/06schemas.go b/sdks/go/examples/snippets/06schemas.go index d68bc7cf70d..63ffb425a62 100644 --- a/sdks/go/examples/snippets/06schemas.go +++ b/sdks/go/examples/snippets/06schemas.go @@ -98,7 +98,7 @@ func (p *TimestampNanosProvider) FromLogicalType(rt reflect.Type) (reflect.Type, } // BuildEncoder builds a Beam schema encoder for the TimestampNanos type. -func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { +func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(any, io.Writer) error, error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } @@ -106,7 +106,7 @@ func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(interface{} if err != nil { return nil, err } - return func(iface interface{}, w io.Writer) error { + return func(iface any, w io.Writer) error { v := iface.(TimestampNanos) return enc(tnStorage{ Seconds: v.Seconds(), @@ -116,7 +116,7 @@ func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(interface{} } // BuildDecoder builds a Beam schema decoder for the TimestampNanos type. -func (p *TimestampNanosProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { +func (p *TimestampNanosProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (any, error), error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } @@ -124,7 +124,7 @@ func (p *TimestampNanosProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) if err != nil { return nil, err } - return func(r io.Reader) (interface{}, error) { + return func(r io.Reader) (any, error) { s, err := dec(r) if err != nil { return nil, err diff --git a/sdks/go/examples/snippets/06schemas_test.go b/sdks/go/examples/snippets/06schemas_test.go index 1353c976987..325a96274c2 100644 --- a/sdks/go/examples/snippets/06schemas_test.go +++ b/sdks/go/examples/snippets/06schemas_test.go @@ -149,7 +149,7 @@ func TestSchema_validate(t *testing.T) { tests := []struct { rt reflect.Type p beam.SchemaProvider - logical, storage interface{} + logical, storage any }{ { rt: tnType, diff --git a/sdks/go/examples/snippets/12splittabledofns.go b/sdks/go/examples/snippets/12splittabledofns.go index a903881c749..0ad74e240c8 100644 --- a/sdks/go/examples/snippets/12splittabledofns.go +++ b/sdks/go/examples/snippets/12splittabledofns.go @@ -28,7 +28,7 @@ type SomeService struct { ThrottlingErr error } -func (s *SomeService) readNextRecords(position interface{}) ([]Record, error) { +func (s *SomeService) readNextRecords(position any) ([]Record, error) { return []Record{}, nil } diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go index 3a0552e53eb..062bb337e8d 100644 --- a/sdks/go/pkg/beam/coder.go +++ b/sdks/go/pkg/beam/coder.go @@ -105,7 +105,7 @@ type execEncoder struct { coder *coder.Coder } -func (e *execEncoder) Encode(element interface{}, w io.Writer) error { +func (e *execEncoder) Encode(element any, w io.Writer) error { return e.enc.Encode(&exec.FullValue{Elm: element}, w) } @@ -129,7 +129,7 @@ type execDecoder struct { coder *coder.Coder } -func (d *execDecoder) Decode(r io.Reader) (interface{}, error) { +func (d *execDecoder) Decode(r io.Reader) (any, error) { fv, err := d.dec.Decode(r) if err != nil { return nil, err @@ -331,10 +331,10 @@ func newJSONCoder(t reflect.Type) (*coder.CustomCoder, error) { // These maps and mutexes are actuated per element, which can be expensive. var ( encMu sync.Mutex - schemaEncs = map[reflect.Type]func(interface{}, io.Writer) error{} + schemaEncs = map[reflect.Type]func(any, io.Writer) error{} decMu sync.Mutex - schemaDecs = map[reflect.Type]func(io.Reader) (interface{}, error){} + schemaDecs = map[reflect.Type]func(io.Reader) (any, error){} ) // schemaEnc encodes the supplied value as beam schema. diff --git a/sdks/go/pkg/beam/coder_test.go b/sdks/go/pkg/beam/coder_test.go index 46f5f294b0a..52981de28ee 100644 --- a/sdks/go/pkg/beam/coder_test.go +++ b/sdks/go/pkg/beam/coder_test.go @@ -26,7 +26,7 @@ import ( func TestJSONCoder(t *testing.T) { v := "teststring" - tests := []interface{}{ + tests := []any{ 43, 12431235, -2, @@ -70,7 +70,7 @@ func TestJSONCoder(t *testing.T) { func TestSchemaCoder(t *testing.T) { v := "teststring" - tests := []interface{}{ + tests := []any{ struct { A int B *string @@ -126,7 +126,7 @@ func TestCoders(t *testing.T) { A [4]int } schema.RegisterType(reflect.TypeOf((*regTestType)(nil))) - tests := []interface{}{ + tests := []any{ 43, 12431235, -2, diff --git a/sdks/go/pkg/beam/combine.go b/sdks/go/pkg/beam/combine.go index a7a563794f8..8845b50c331 100644 --- a/sdks/go/pkg/beam/combine.go +++ b/sdks/go/pkg/beam/combine.go @@ -24,21 +24,21 @@ import ( // Combine inserts a global Combine transform into the pipeline. It // expects a PCollection<T> as input where T is a concrete type. // Combine supports TypeDefinition options for binding generic types in combinefn. -func Combine(s Scope, combinefn interface{}, col PCollection, opts ...Option) PCollection { +func Combine(s Scope, combinefn any, col PCollection, opts ...Option) PCollection { return Must(TryCombine(s, combinefn, col, opts...)) } // CombinePerKey inserts a GBK and per-key Combine transform into the pipeline. It // expects a PCollection<KV<K,T>>. The CombineFn may optionally take a key parameter. // CombinePerKey supports TypeDefinition options for binding generic types in combinefn. -func CombinePerKey(s Scope, combinefn interface{}, col PCollection, opts ...Option) PCollection { +func CombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) PCollection { return Must(TryCombinePerKey(s, combinefn, col, opts...)) } // TryCombine attempts to insert a global Combine transform into the pipeline. It may fail // for multiple reasons, notably that the combinefn is not valid or cannot be bound // -- due to type mismatch, say -- to the incoming PCollections. -func TryCombine(s Scope, combinefn interface{}, col PCollection, opts ...Option) (PCollection, error) { +func TryCombine(s Scope, combinefn any, col PCollection, opts ...Option) (PCollection, error) { pre := AddFixedKey(s, col) post, err := TryCombinePerKey(s, combinefn, pre, opts...) if err != nil { @@ -54,7 +54,7 @@ func addCombinePerKeyCtx(err error, s Scope) error { // TryCombinePerKey attempts to insert a per-key Combine transform into the pipeline. It may fail // for multiple reasons, notably that the combinefn is not valid or cannot be bound // -- due to type mismatch, say -- to the incoming PCollection. -func TryCombinePerKey(s Scope, combinefn interface{}, col PCollection, opts ...Option) (PCollection, error) { +func TryCombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) (PCollection, error) { s = s.Scope(graph.CombinePerKeyScope) ValidateKVType(col) side, typedefs, err := validate(s, col, opts) diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go index c7916d43f94..4ddc5396c72 100644 --- a/sdks/go/pkg/beam/create.go +++ b/sdks/go/pkg/beam/create.go @@ -28,19 +28,19 @@ import ( // The returned PCollections can be used as any other PCollections. The values // are JSON-coded. Each runner may place limits on the sizes of the values and // Create should generally only be used for small collections. -func Create(s Scope, values ...interface{}) PCollection { +func Create(s Scope, values ...any) PCollection { return Must(TryCreate(s, values...)) } // CreateList inserts a fixed set of values into the pipeline from a slice or // array. Unlike Create this supports the creation of an empty PCollection. -func CreateList(s Scope, list interface{}) PCollection { +func CreateList(s Scope, list any) PCollection { return Must(TryCreateList(s, list)) } // TryCreate inserts a fixed non-empty set of values into the pipeline. The // values must be of the same type. -func TryCreate(s Scope, values ...interface{}) (PCollection, error) { +func TryCreate(s Scope, values ...any) (PCollection, error) { if len(values) == 0 { err := errors.New("create has no values") return PCollection{}, addCreateCtx(err, s) @@ -53,14 +53,14 @@ func TryCreate(s Scope, values ...interface{}) (PCollection, error) { // TryCreateList inserts a fixed set of values into the pipeline from a slice or // array. The values must be of the same type. Unlike TryCreate this supports // the creation of an empty PCollection. -func TryCreateList(s Scope, list interface{}) (PCollection, error) { +func TryCreateList(s Scope, list any) (PCollection, error) { val := reflect.ValueOf(list) if val.Kind() != reflect.Slice && val.Kind() != reflect.Array { err := errors.Errorf("input %v must be a slice or array", list) return PCollection{}, addCreateCtx(err, s) } - var ret []interface{} + var ret []any for i := 0; i < val.Len(); i++ { ret = append(ret, val.Index(i).Interface()) } @@ -78,7 +78,7 @@ func addCreateCtx(err error, s Scope) error { return errors.WithContextf(err, "inserting Create in scope %s", s) } -func createList(s Scope, values []interface{}, t reflect.Type) (PCollection, error) { +func createList(s Scope, values []any, t reflect.Type) (PCollection, error) { fn := &createFn{Type: EncodedType{T: t}} enc := NewElementEncoder(t) diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go index 3386469eb39..3acfe779bba 100644 --- a/sdks/go/pkg/beam/create_test.go +++ b/sdks/go/pkg/beam/create_test.go @@ -33,16 +33,16 @@ type wc struct { func TestCreate(t *testing.T) { tests := []struct { - values []interface{} + values []any }{ - {[]interface{}{1, 2, 3}}, - {[]interface{}{"1", "2", "3"}}, - {[]interface{}{float32(0.1), float32(0.2), float32(0.3)}}, - {[]interface{}{float64(0.1), float64(0.2), float64(0.3)}}, - {[]interface{}{uint(1), uint(2), uint(3)}}, - {[]interface{}{false, true, true, false, true}}, - {[]interface{}{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}}, - {[]interface{}{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401 + {[]any{1, 2, 3}}, + {[]any{"1", "2", "3"}}, + {[]any{float32(0.1), float32(0.2), float32(0.3)}}, + {[]any{float64(0.1), float64(0.2), float64(0.3)}}, + {[]any{uint(1), uint(2), uint(3)}}, + {[]any{false, true, true, false, true}}, + {[]any{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}}, + {[]any{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401 } for _, test := range tests { @@ -58,7 +58,7 @@ func TestCreate(t *testing.T) { func TestCreateList(t *testing.T) { tests := []struct { - values interface{} + values any }{ {[]int{1, 2, 3}}, {[]string{"1", "2", "3"}}, @@ -74,7 +74,7 @@ func TestCreateList(t *testing.T) { p, s := beam.NewPipelineWithRoot() c := beam.CreateList(s, test.values) - var values []interface{} + var values []any v := reflect.ValueOf(test.values) for i := 0; i < v.Len(); i++ { values = append(values, v.Index(i).Interface()) @@ -89,7 +89,7 @@ func TestCreateList(t *testing.T) { func TestCreateEmptyList(t *testing.T) { tests := []struct { - values interface{} + values any }{ {[]int{}}, {[]string{}}, diff --git a/sdks/go/pkg/beam/encoding.go b/sdks/go/pkg/beam/encoding.go index bc66d601b98..c716f28c5ac 100644 --- a/sdks/go/pkg/beam/encoding.go +++ b/sdks/go/pkg/beam/encoding.go @@ -82,8 +82,8 @@ func (w *EncodedType) UnmarshalJSON(buf []byte) error { return nil } -func encodedTypeEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { - return func(iface interface{}, w io.Writer) error { +func encodedTypeEnc(reflect.Type) (func(any, io.Writer) error, error) { + return func(iface any, w io.Writer) error { if err := coder.WriteSimpleRowHeader(1, w); err != nil { return err } @@ -100,8 +100,8 @@ func encodedTypeEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { nil } -func encodedTypeDec(reflect.Type) (func(io.Reader) (interface{}, error), error) { - return func(r io.Reader) (interface{}, error) { +func encodedTypeDec(reflect.Type) (func(io.Reader) (any, error), error) { + return func(r io.Reader) (any, error) { if err := coder.ReadSimpleRowHeader(1, r); err != nil { return nil, err } @@ -149,8 +149,8 @@ func (w *EncodedFunc) UnmarshalJSON(buf []byte) error { return nil } -func encodedFuncEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { - return func(iface interface{}, w io.Writer) error { +func encodedFuncEnc(reflect.Type) (func(any, io.Writer) error, error) { + return func(iface any, w io.Writer) error { if err := coder.WriteSimpleRowHeader(1, w); err != nil { return err } @@ -167,8 +167,8 @@ func encodedFuncEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { nil } -func encodedFuncDec(reflect.Type) (func(io.Reader) (interface{}, error), error) { - return func(r io.Reader) (interface{}, error) { +func encodedFuncDec(reflect.Type) (func(io.Reader) (any, error), error) { + return func(r io.Reader) (any, error) { if err := coder.ReadSimpleRowHeader(1, r); err != nil { return nil, err } @@ -226,8 +226,8 @@ func (w *EncodedCoder) UnmarshalJSON(buf []byte) error { return nil } -func encodedCoderEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { - return func(iface interface{}, w io.Writer) error { +func encodedCoderEnc(reflect.Type) (func(any, io.Writer) error, error) { + return func(iface any, w io.Writer) error { if err := coder.WriteSimpleRowHeader(1, w); err != nil { return err } @@ -244,8 +244,8 @@ func encodedCoderEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { nil } -func encodedCoderDec(reflect.Type) (func(io.Reader) (interface{}, error), error) { - return func(r io.Reader) (interface{}, error) { +func encodedCoderDec(reflect.Type) (func(io.Reader) (any, error), error) { + return func(r io.Reader) (any, error) { if err := coder.ReadSimpleRowHeader(1, r); err != nil { return nil, err } @@ -262,8 +262,8 @@ func encodedCoderDec(reflect.Type) (func(io.Reader) (interface{}, error), error) nil } -func timeEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { - return func(iface interface{}, w io.Writer) error { +func timeEnc(reflect.Type) (func(any, io.Writer) error, error) { + return func(iface any, w io.Writer) error { if err := coder.WriteSimpleRowHeader(1, w); err != nil { return errors.Wrap(err, "encoding time.Time schema override") } @@ -282,8 +282,8 @@ func timeEnc(reflect.Type) (func(interface{}, io.Writer) error, error) { }, nil } -func timeDec(reflect.Type) (func(io.Reader) (interface{}, error), error) { - return func(r io.Reader) (interface{}, error) { +func timeDec(reflect.Type) (func(io.Reader) (any, error), error) { + return func(r io.Reader) (any, error) { if err := coder.ReadSimpleRowHeader(1, r); err != nil { return nil, errors.Wrap(err, "decoding time.Time schema override") } diff --git a/sdks/go/pkg/beam/example_schema_test.go b/sdks/go/pkg/beam/example_schema_test.go index 8dee0af01a7..f46b9f49ed6 100644 --- a/sdks/go/pkg/beam/example_schema_test.go +++ b/sdks/go/pkg/beam/example_schema_test.go @@ -104,7 +104,7 @@ func (p *AlphabetProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error } // BuildEncoder returns beam schema encoder functions for types with the Alphabet interface. -func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { +func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(any, io.Writer) error, error) { switch rt { case typeCyrillic: if p.enc == nil { @@ -113,7 +113,7 @@ func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.W // Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type. return p.enc.Build(rt) case typeLatin: - return func(iface interface{}, w io.Writer) error { + return func(iface any, w io.Writer) error { v := iface.(*Latin) // Beam Schema Rows have a header that indicates which fields if any, are nil. if err := coder.WriteRowHeader(2, func(i int) bool { @@ -137,7 +137,7 @@ func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.W return nil }, nil case typeΕλληνικά: - return func(iface interface{}, w io.Writer) error { + return func(iface any, w io.Writer) error { // Since the representation for Ελληνικά never has nil fields // we can use the simple header helper. if err := coder.WriteSimpleRowHeader(1, w); err != nil { @@ -154,7 +154,7 @@ func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.W } // BuildDecoder returns beam schema decoder functions for types with the Alphabet interface. -func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { +func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (any, error), error) { switch rt { case typeCyrillic: if p.dec == nil { @@ -163,7 +163,7 @@ func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (inter // Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type. return p.dec.Build(rt) case typeLatin: - return func(r io.Reader) (interface{}, error) { + return func(r io.Reader) (any, error) { // Since the d field can be nil, we use the header get the nil bits. n, nils, err := coder.ReadRowHeader(r) if err != nil { @@ -194,7 +194,7 @@ func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (inter }, nil }, nil case typeΕλληνικά: - return func(r io.Reader) (interface{}, error) { + return func(r io.Reader) (any, error) { // Since the representation for Ελληνικά never has nil fields // we can use the simple header helper. Returns an error if // something unexpected occurs. diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go index ea437c77687..210c39ab4e4 100644 --- a/sdks/go/pkg/beam/forward.go +++ b/sdks/go/pkg/beam/forward.go @@ -62,7 +62,7 @@ func init() { // and is needed for functions -- such as custom coders -- serialized during unit // tests, where the underlying symbol table is not available. It should be called // in `init()` only. -func RegisterFunction(fn interface{}) { +func RegisterFunction(fn any) { runtime.RegisterFunction(fn) } @@ -84,7 +84,7 @@ func RegisterFunction(fn interface{}) { // beam.RegisterDoFn(FunctionalDoFn) // beam.RegisterDoFn(reflect.TypeOf((*StructuralDoFn)(nil)).Elem()) // } -func RegisterDoFn(dofn interface{}) { +func RegisterDoFn(dofn any) { genx.RegisterDoFn(dofn) } @@ -128,7 +128,7 @@ func RegisterInit(hook func()) { // func(reflect.Type, []byte) (T, error) // // where T is the matching user type. -func RegisterCoder(t reflect.Type, encoder, decoder interface{}) { +func RegisterCoder(t reflect.Type, encoder, decoder any) { runtime.RegisterType(t) runtime.RegisterFunction(encoder) runtime.RegisterFunction(decoder) diff --git a/sdks/go/pkg/beam/option.go b/sdks/go/pkg/beam/option.go index db8f4331e9b..76b7e37383e 100644 --- a/sdks/go/pkg/beam/option.go +++ b/sdks/go/pkg/beam/option.go @@ -30,8 +30,8 @@ type Option interface { type SideInput struct { Input PCollection - // WindowFn interface{} - // ViewFn interface{} + // WindowFn any + // ViewFn any } func (s SideInput) private() {} diff --git a/sdks/go/pkg/beam/options/jobopts/stringSlice.go b/sdks/go/pkg/beam/options/jobopts/stringSlice.go index fe390305c72..9544a0d81a4 100644 --- a/sdks/go/pkg/beam/options/jobopts/stringSlice.go +++ b/sdks/go/pkg/beam/options/jobopts/stringSlice.go @@ -46,6 +46,6 @@ func (s *stringSlice) Set(value string) error { } // Get returns the instance itself. -func (s stringSlice) Get() interface{} { +func (s stringSlice) Get() any { return s } diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go index 6f4c05e14f4..e1cdc4f417c 100644 --- a/sdks/go/pkg/beam/pardo.go +++ b/sdks/go/pkg/beam/pardo.go @@ -36,7 +36,7 @@ func addParDoCtx(err error, s Scope) error { // TryParDo attempts to insert a ParDo transform into the pipeline. It may fail // for multiple reasons, notably that the dofn is not valid or cannot be bound // -- due to type mismatch, say -- to the incoming PCollections. -func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCollection, error) { +func TryParDo(s Scope, dofn any, col PCollection, opts ...Option) ([]PCollection, error) { side, typedefs, err := validate(s, col, opts) if err != nil { return nil, addParDoCtx(err, s) @@ -126,12 +126,12 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo } // ParDoN inserts a ParDo with any number of outputs into the pipeline. -func ParDoN(s Scope, dofn interface{}, col PCollection, opts ...Option) []PCollection { +func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection { return MustN(TryParDo(s, dofn, col, opts...)) } // ParDo0 inserts a ParDo with zero output transform into the pipeline. -func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) { +func ParDo0(s Scope, dofn any, col PCollection, opts ...Option) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 0 { panic(formatParDoError(dofn, len(ret), 0)) @@ -402,7 +402,7 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) { // // See https://beam.apache.org/documentation/programming-guide/#pardo // for the web documentation for ParDo -func ParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) PCollection { +func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 1 { panic(formatParDoError(dofn, len(ret), 1)) @@ -413,7 +413,7 @@ func ParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) PCollecti // TODO(herohde) 6/1/2017: add windowing aspects to above documentation. // ParDo2 inserts a ParDo with 2 outputs into the pipeline. -func ParDo2(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection) { +func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 2 { panic(formatParDoError(dofn, len(ret), 2)) @@ -422,7 +422,7 @@ func ParDo2(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec } // ParDo3 inserts a ParDo with 3 outputs into the pipeline. -func ParDo3(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection) { +func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 3 { panic(formatParDoError(dofn, len(ret), 3)) @@ -431,7 +431,7 @@ func ParDo3(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec } // ParDo4 inserts a ParDo with 4 outputs into the pipeline. -func ParDo4(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection) { +func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 4 { panic(formatParDoError(dofn, len(ret), 4)) @@ -440,7 +440,7 @@ func ParDo4(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec } // ParDo5 inserts a ParDo with 5 outputs into the pipeline. -func ParDo5(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection) { +func ParDo5(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 5 { panic(formatParDoError(dofn, len(ret), 5)) @@ -449,7 +449,7 @@ func ParDo5(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec } // ParDo6 inserts a ParDo with 6 outputs into the pipeline. -func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) { +func ParDo6(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 6 { panic(formatParDoError(dofn, len(ret), 6)) @@ -458,7 +458,7 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec } // ParDo7 inserts a ParDo with 7 outputs into the pipeline. -func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) { +func ParDo7(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) { ret := MustN(TryParDo(s, dofn, col, opts...)) if len(ret) != 7 { panic(formatParDoError(dofn, len(ret), 7)) @@ -472,7 +472,7 @@ func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec // We construct a new graph.Fn using the doFn which is passed. We explicitly // ignore the error since we already know that its already a DoFn type as // TryParDo would have panicked otherwise. -func formatParDoError(doFn interface{}, emitSize int, parDoSize int) string { +func formatParDoError(doFn any, emitSize int, parDoSize int) string { doFun, _ := graph.NewFn(doFn) doFnName := doFun.Name() diff --git a/sdks/go/pkg/beam/partition.go b/sdks/go/pkg/beam/partition.go index a2cb1c78664..37498ddbc0b 100644 --- a/sdks/go/pkg/beam/partition.go +++ b/sdks/go/pkg/beam/partition.go @@ -40,7 +40,7 @@ var ( // A PartitionFn has the signature `func(T) int.` // // T is permitted to be a KV. -func Partition(s Scope, n int, fn interface{}, col PCollection) []PCollection { +func Partition(s Scope, n int, fn any, col PCollection) []PCollection { s = s.Scope(fmt.Sprintf("Partition(%v)", n)) if n < 1 { @@ -106,20 +106,20 @@ func (f *partitionFn) Type() reflect.Type { return f.t } -func (f *partitionFn) Call(args []interface{}) []interface{} { +func (f *partitionFn) Call(args []any) []any { timestamp := args[0] value := args[1] n := f.fn.Call1x1(value).(int) if n < 0 || n >= f.n { - return []interface{}{errors.Errorf("partitionFn(%v) = %v, want [0,%v)", value, n, f.n)} + return []any{errors.Errorf("partitionFn(%v) = %v, want [0,%v)", value, n, f.n)} } emit := args[n+2] reflectx.MakeFunc2x0(emit).Call2x0(timestamp, value) var err error - return []interface{}{err} + return []any{err} } // partitionFnKV is a Func with the following underlying type: @@ -144,21 +144,21 @@ func (f *partitionFnKV) Type() reflect.Type { return f.t } -func (f *partitionFnKV) Call(args []interface{}) []interface{} { +func (f *partitionFnKV) Call(args []any) []any { timestamp := args[0] key := args[1] value := args[2] n := f.fnKV.Call2x1(key, value).(int) if n < 0 || n >= f.n { - return []interface{}{errors.Errorf("partitionFn(%v) = %v, want [0,%v)", value, n, f.n)} + return []any{errors.Errorf("partitionFn(%v) = %v, want [0,%v)", value, n, f.n)} } emit := args[n+3] reflectx.MakeFunc3x0(emit).Call3x0(timestamp, key, value) var err error - return []interface{}{err} + return []any{err} } func makePartitionFn(name string, t reflect.Type, enc []byte) reflectx.Func { diff --git a/sdks/go/pkg/beam/partition_test.go b/sdks/go/pkg/beam/partition_test.go index e2b9c5a31b6..3297d00d41d 100644 --- a/sdks/go/pkg/beam/partition_test.go +++ b/sdks/go/pkg/beam/partition_test.go @@ -62,7 +62,7 @@ func TestPartition(t *testing.T) { tests := []struct { in []int n int - fn interface{} + fn any out0 []int }{ { @@ -122,7 +122,7 @@ func TestPartitionKV(t *testing.T) { tests := []struct { in []kvIntInt n int - fn interface{} + fn any out0 []kvIntInt }{ { @@ -156,7 +156,7 @@ func TestPartitionFailures(t *testing.T) { tests := []struct { in []int n int - fn interface{} + fn any }{ { []int{1, 2}, @@ -189,7 +189,7 @@ func TestPartitionFlattenIdentity(t *testing.T) { tests := []struct { in []int n int - fn interface{} + fn any }{ { []int{1, 2, 3, 4}, diff --git a/sdks/go/pkg/beam/provision/provision.go b/sdks/go/pkg/beam/provision/provision.go index 934a74c124c..126208bb354 100644 --- a/sdks/go/pkg/beam/provision/provision.go +++ b/sdks/go/pkg/beam/provision/provision.go @@ -51,7 +51,7 @@ func Info(ctx context.Context, endpoint string) (*fnpb.ProvisionInfo, error) { } // OptionsToProto converts pipeline options to a proto struct via JSON. -func OptionsToProto(v interface{}) (*google_pb.Struct, error) { +func OptionsToProto(v any) (*google_pb.Struct, error) { data, err := json.Marshal(v) if err != nil { return nil, err @@ -69,7 +69,7 @@ func JSONToProto(data string) (*google_pb.Struct, error) { } // ProtoToOptions converts pipeline options from a proto struct via JSON. -func ProtoToOptions(opt *google_pb.Struct, v interface{}) error { +func ProtoToOptions(opt *google_pb.Struct, v any) error { data, err := ProtoToJSON(opt) if err != nil { return err diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go index 86a5ae8f98b..ece238d2471 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go @@ -27,7 +27,7 @@ import ( ) // newMsg creates a json-encoded RawMessage. Panics if encoding fails. -func newMsg(msg interface{}) googleapi.RawMessage { +func newMsg(msg any) googleapi.RawMessage { data, err := json.Marshal(msg) if err != nil { panic(err) @@ -38,7 +38,7 @@ func newMsg(msg interface{}) googleapi.RawMessage { // pipelineOptions models Job/Environment/SdkPipelineOptions type pipelineOptions struct { DisplayData []*displayData `json:"display_data,omitempty"` - Options interface{} `json:"options,omitempty"` + Options any `json:"options,omitempty"` GoOptions runtime.RawOptions `json:"beam:option:go_options:v1,omitempty"` } @@ -161,15 +161,15 @@ func newOutputReference(step, output string) *outputReference { } type displayData struct { - Key string `json:"key,omitempty"` - Label string `json:"label,omitempty"` - Namespace string `json:"namespace,omitempty"` - ShortValue string `json:"shortValue,omitempty"` - Type string `json:"type,omitempty"` - Value interface{} `json:"value,omitempty"` + Key string `json:"key,omitempty"` + Label string `json:"label,omitempty"` + Namespace string `json:"namespace,omitempty"` + ShortValue string `json:"shortValue,omitempty"` + Type string `json:"type,omitempty"` + Value any `json:"value,omitempty"` } -func findDisplayDataType(value interface{}) (string, interface{}) { +func findDisplayDataType(value any) (string, any) { switch value.(type) { case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: return "INTEGER", value @@ -182,7 +182,7 @@ func findDisplayDataType(value interface{}) (string, interface{}) { } } -func newDisplayData(key, label, namespace string, value interface{}) *displayData { +func newDisplayData(key, label, namespace string, value any) *displayData { t, v := findDisplayDataType(value) return &displayData{ diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go index f4ccf5f673d..935d23b0fdd 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go @@ -95,7 +95,7 @@ func extractKey(metric *df.MetricUpdate, p *pipepb.Pipeline) (metrics.StepKey, e return metrics.StepKey{Step: userStepName, Name: metric.Name.Name, Namespace: namespace}, nil } -func extractCounterValue(obj interface{}) (int64, error) { +func extractCounterValue(obj any) (int64, error) { v, ok := obj.(float64) if !ok { return -1, fmt.Errorf("expected float64, got data of type %T instead", obj) @@ -103,8 +103,8 @@ func extractCounterValue(obj interface{}) (int64, error) { return int64(v), nil } -func extractDistributionValue(obj interface{}) (metrics.DistributionValue, error) { - m := obj.(map[string]interface{}) +func extractDistributionValue(obj any) (metrics.DistributionValue, error) { + m := obj.(map[string]any) propertiesToVisit := []string{"count", "sum", "min", "max"} var values [4]int64 diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics_test.go index 3c0adab3c91..94f6b3f00de 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics_test.go @@ -74,7 +74,7 @@ func TestFromMetricUpdates_Distributions(t *testing.T) { Name: "customDist", Namespace: "customDoFn", }} - distribution := map[string]interface{}{ + distribution := map[string]any{ "count": 100.0, "sum": 5.0, "min": -12.0, diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go b/sdks/go/pkg/beam/runners/direct/buffer.go index 8e92ff72d0b..e831930a618 100644 --- a/sdks/go/pkg/beam/runners/direct/buffer.go +++ b/sdks/go/pkg/beam/runners/direct/buffer.go @@ -71,7 +71,7 @@ func (n *buffer) NewIterable(ctx context.Context, reader exec.StateReader, w typ return &exec.FixedReStream{Buf: n.buf}, nil } -func (n *buffer) NewKeyedIterable(ctx context.Context, reader exec.StateReader, w typex.Window, iterKey interface{}) (exec.ReStream, error) { +func (n *buffer) NewKeyedIterable(ctx context.Context, reader exec.StateReader, w typex.Window, iterKey any) (exec.ReStream, error) { return n.NewIterable(ctx, reader, w) } diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go index 581f6ee0cbb..131fa0b1ec1 100644 --- a/sdks/go/pkg/beam/runners/vet/vet.go +++ b/sdks/go/pkg/beam/runners/vet/vet.go @@ -466,7 +466,7 @@ func (e *Eval) diag(s string) { } // diag invokes fmt.Fprintf on the diagnostic buffer. -func (e *Eval) diagf(f string, args ...interface{}) { +func (e *Eval) diagf(f string, args ...any) { fmt.Fprintf(&e.d, f, args...) } @@ -476,7 +476,7 @@ func (e *Eval) Print(s string) { } // Printf invokes fmt.Fprintf on the Eval buffer. -func (e *Eval) Printf(f string, args ...interface{}) { +func (e *Eval) Printf(f string, args ...any) { fmt.Fprintf(&e.w, f, args...) } @@ -485,7 +485,7 @@ func (e *Eval) Bytes() []byte { return e.w.Bytes() } -// We need to take graph.Fns (which can be created from interface{} from graph.NewFn) +// We need to take graph.Fns (which can be created from any from graph.NewFn) // and convert them to all needed function caller signatures, // and emitters. // diff --git a/sdks/go/pkg/beam/schema.go b/sdks/go/pkg/beam/schema.go index 140956328c1..611d0adf21d 100644 --- a/sdks/go/pkg/beam/schema.go +++ b/sdks/go/pkg/beam/schema.go @@ -54,7 +54,7 @@ import ( // // RegisterSchemaProvider must be called before beam.Init(), and conventionally // is called in a package init() function. -func RegisterSchemaProvider(rt reflect.Type, provider interface{}) { +func RegisterSchemaProvider(rt reflect.Type, provider any) { p := provider.(SchemaProvider) switch rt.Kind() { case reflect.Interface: @@ -84,7 +84,7 @@ func RegisterSchemaProvider(rt reflect.Type, provider interface{}) { // // RegisterSchemaProviderWithURN must be called before beam.Init(), and conventionally // is called in a package init() function. -func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn string) { +func RegisterSchemaProviderWithURN(rt reflect.Type, provider any, urn string) { p := provider.(SchemaProvider) st, err := p.FromLogicalType(rt) if err != nil { @@ -102,6 +102,6 @@ func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn st // Sepearated out the acting type from the provider implementation is good. type SchemaProvider interface { FromLogicalType(reflect.Type) (reflect.Type, error) - BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) - BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) + BuildEncoder(rt reflect.Type) (func(any, io.Writer) error, error) + BuildDecoder(rt reflect.Type) (func(io.Reader) (any, error), error) } diff --git a/sdks/go/pkg/beam/transforms/filter/distinct_test.go b/sdks/go/pkg/beam/transforms/filter/distinct_test.go index 1ba8f560190..9f073b339fa 100644 --- a/sdks/go/pkg/beam/transforms/filter/distinct_test.go +++ b/sdks/go/pkg/beam/transforms/filter/distinct_test.go @@ -45,32 +45,32 @@ type s struct { func TestDedup(t *testing.T) { tests := []struct { - dups []interface{} - exp []interface{} + dups []any + exp []any }{ { - []interface{}{1, 2, 3}, - []interface{}{1, 2, 3}, + []any{1, 2, 3}, + []any{1, 2, 3}, }, { - []interface{}{3, 2, 1}, - []interface{}{1, 2, 3}, + []any{3, 2, 1}, + []any{1, 2, 3}, }, { - []interface{}{1, 1, 1, 2, 3}, - []interface{}{1, 2, 3}, + []any{1, 1, 1, 2, 3}, + []any{1, 2, 3}, }, { - []interface{}{1, 2, 3, 2, 2, 2, 3, 1, 1, 1, 2, 3, 1}, - []interface{}{1, 2, 3}, + []any{1, 2, 3, 2, 2, 2, 3, 1, 1, 1, 2, 3, 1}, + []any{1, 2, 3}, }, { - []interface{}{"1", "2", "3", "2", "1"}, - []interface{}{"1", "2", "3"}, + []any{"1", "2", "3", "2", "1"}, + []any{"1", "2", "3"}, }, { - []interface{}{s{1, "a"}, s{2, "a"}, s{1, "a"}}, - []interface{}{s{1, "a"}, s{2, "a"}}, + []any{s{1, "a"}, s{2, "a"}, s{1, "a"}}, + []any{s{1, "a"}, s{2, "a"}}, }, } diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go b/sdks/go/pkg/beam/transforms/filter/filter.go index 8966ad524cc..913e7355c30 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter.go +++ b/sdks/go/pkg/beam/transforms/filter/filter.go @@ -42,7 +42,7 @@ var ( // }) // // Here, "short" will contain "a" and "b" at runtime. -func Include(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollection { +func Include(s beam.Scope, col beam.PCollection, fn any) beam.PCollection { s = s.Scope("filter.Include") funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, col.Type().Type())) @@ -60,7 +60,7 @@ func Include(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollectio // }) // // Here, "long" will contain "long" and "alsolong" at runtime. -func Exclude(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollection { +func Exclude(s beam.Scope, col beam.PCollection, fn any) beam.PCollection { s = s.Scope("filter.Exclude") funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, col.Type().Type())) diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go b/sdks/go/pkg/beam/transforms/filter/filter_test.go index b324cf987c0..788351e0917 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter_test.go +++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go @@ -38,7 +38,7 @@ import ( func TestInclude(t *testing.T) { tests := []struct { in []int - fn interface{} + fn any exp []int }{ { @@ -71,7 +71,7 @@ func TestInclude(t *testing.T) { func TestExclude(t *testing.T) { tests := []struct { in []int - fn interface{} + fn any exp []int }{ { diff --git a/sdks/go/pkg/beam/transforms/stats/max_switch.go b/sdks/go/pkg/beam/transforms/stats/max_switch.go index b31d41aa6dc..d97c8c1d6b9 100644 --- a/sdks/go/pkg/beam/transforms/stats/max_switch.go +++ b/sdks/go/pkg/beam/transforms/stats/max_switch.go @@ -22,7 +22,7 @@ import ( "reflect" ) -func findMaxFn(t reflect.Type) interface{} { +func findMaxFn(t reflect.Type) any { switch t.String() { case "int": return maxIntFn diff --git a/sdks/go/pkg/beam/transforms/stats/max_switch.tmpl b/sdks/go/pkg/beam/transforms/stats/max_switch.tmpl index 9424569a62a..a7febe4a279 100644 --- a/sdks/go/pkg/beam/transforms/stats/max_switch.tmpl +++ b/sdks/go/pkg/beam/transforms/stats/max_switch.tmpl @@ -20,7 +20,7 @@ import ( "reflect" ) -func findMaxFn(t reflect.Type) interface{} { +func findMaxFn(t reflect.Type) any { switch t.String() { {{- range .X}} case "{{.Type}}": diff --git a/sdks/go/pkg/beam/transforms/stats/mean.go b/sdks/go/pkg/beam/transforms/stats/mean.go index 4324e31d5a8..0cc9e2e9403 100644 --- a/sdks/go/pkg/beam/transforms/stats/mean.go +++ b/sdks/go/pkg/beam/transforms/stats/mean.go @@ -69,7 +69,7 @@ func (f *meanFn) CreateAccumulator() meanAccum { func (f *meanFn) AddInput(a meanAccum, val beam.T) meanAccum { a.Count++ - a.Sum += reflect.ValueOf(val.(interface{})).Convert(reflectx.Float64).Interface().(float64) + a.Sum += reflect.ValueOf(val.(any)).Convert(reflectx.Float64).Interface().(float64) return a } diff --git a/sdks/go/pkg/beam/transforms/stats/min_switch.go b/sdks/go/pkg/beam/transforms/stats/min_switch.go index 75dfe729ec3..64b806b30c7 100644 --- a/sdks/go/pkg/beam/transforms/stats/min_switch.go +++ b/sdks/go/pkg/beam/transforms/stats/min_switch.go @@ -22,7 +22,7 @@ import ( "reflect" ) -func findMinFn(t reflect.Type) interface{} { +func findMinFn(t reflect.Type) any { switch t.String() { case "int": return minIntFn diff --git a/sdks/go/pkg/beam/transforms/stats/min_switch.tmpl b/sdks/go/pkg/beam/transforms/stats/min_switch.tmpl index 0dd082d785c..cc173d80e56 100644 --- a/sdks/go/pkg/beam/transforms/stats/min_switch.tmpl +++ b/sdks/go/pkg/beam/transforms/stats/min_switch.tmpl @@ -20,7 +20,7 @@ import ( "reflect" ) -func findMinFn(t reflect.Type) interface{} { +func findMinFn(t reflect.Type) any { switch t.String() { {{- range .X}} case "{{.Type}}": diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 17f2a1dad69..79a66b58e1f 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -69,11 +69,11 @@ type sortListHeap struct { less reflectx.Func2x1 } -func (s sortListHeap) Len() int { return len(s.data) } -func (s sortListHeap) Less(i, j int) bool { return s.less.Call2x1(s.data[i][0], s.data[j][0]).(bool) } -func (s sortListHeap) Swap(i, j int) { s.data[i], s.data[j] = s.data[j], s.data[i] } -func (s *sortListHeap) Push(x interface{}) { s.data = append(s.data, x.([]beam.T)) } -func (s *sortListHeap) Pop() interface{} { +func (s sortListHeap) Len() int { return len(s.data) } +func (s sortListHeap) Less(i, j int) bool { return s.less.Call2x1(s.data[i][0], s.data[j][0]).(bool) } +func (s sortListHeap) Swap(i, j int) { s.data[i], s.data[j] = s.data[j], s.data[i] } +func (s *sortListHeap) Push(x any) { s.data = append(s.data, x.([]beam.T)) } +func (s *sortListHeap) Pop() any { var x beam.T x, s.data = s.data[len(s.data)-1], s.data[:len(s.data)-1] return x @@ -252,7 +252,7 @@ func (c *compactor) sort(less reflectx.Func2x1) []beam.T { heap.Push(&h, s[1:]) } } - c.sorted = [][]beam.T{mergeSorted(sorted, c.unsorted, func(a, b interface{}) bool { return less.Call2x1(a, b).(bool) })} + c.sorted = [][]beam.T{mergeSorted(sorted, c.unsorted, func(a, b any) bool { return less.Call2x1(a, b).(bool) })} c.unsorted = nil if len(c.sorted[0]) == 0 { c.sorted = nil @@ -388,7 +388,7 @@ func decodeCompactors(data []byte) (*compactors, error) { } // mergeSorted takes two slices which are already sorted and returns a new slice containing all elements sorted together. -func mergeSorted(a, b []beam.T, less func(interface{}, interface{}) bool) []beam.T { +func mergeSorted(a, b []beam.T, less func(any, any) bool) []beam.T { output := make([]beam.T, 0, len(a)+len(b)) for len(a) > 0 && len(b) > 0 { if less(a[0], b[0]) { @@ -408,7 +408,7 @@ func mergeSorted(a, b []beam.T, less func(interface{}, interface{}) bool) []beam } // mergeSortedWeighted takes two slices which are already sorted and returns a new slice containing all elements sorted together. -func mergeSortedWeighted(a, b []weightedElement, less func(interface{}, interface{}) bool) []weightedElement { +func mergeSortedWeighted(a, b []weightedElement, less func(any, any) bool) []weightedElement { output := make([]weightedElement, 0, len(a)+len(b)) for len(a) > 0 && len(b) > 0 { if less(a[0], b[0]) { @@ -558,7 +558,7 @@ func toWeightedSlice(compactor compactor, less reflectx.Func2x1, weight int) []w func (f *approximateQuantilesOutputFn) ExtractOutput(ctx context.Context, compactors *compactors) []beam.T { sorted := toWeightedSlice(compactors.Compactors[0], f.State.less, 1) for level, compactor := range compactors.Compactors[1:] { - sorted = mergeSortedWeighted(sorted, toWeightedSlice(compactor, f.State.less, 1<<uint(level)), func(a, b interface{}) bool { + sorted = mergeSortedWeighted(sorted, toWeightedSlice(compactor, f.State.less, 1<<uint(level)), func(a, b any) bool { return f.State.less.Call2x1(a.(weightedElement).element, b.(weightedElement).element).(bool) }) } @@ -667,7 +667,7 @@ func makeWeightedElement(weight int, element beam.T) weightedElement { // // The output PCollection contains a single element: a list of numQuantiles - 1 elements approximately splitting up the input collection into numQuantiles separate quantiles. // For example, if numQuantiles = 2, the returned list would contain a single element such that approximately half of the input would be less than that element and half would be greater. -func ApproximateQuantiles(s beam.Scope, pc beam.PCollection, less interface{}, opts Opts) beam.PCollection { +func ApproximateQuantiles(s beam.Scope, pc beam.PCollection, less any, opts Opts) beam.PCollection { return ApproximateWeightedQuantiles(s, beam.ParDo(s, func(e beam.T) (int, beam.T) { return 1, e }, pc), less, opts) } @@ -693,7 +693,7 @@ func reduce(s beam.Scope, weightedElements beam.PCollection, state approximateQu // // The output PCollection contains a single element: a list of numQuantiles - 1 elements approximately splitting up the input collection into numQuantiles separate quantiles. // For example, if numQuantiles = 2, the returned list would contain a single element such that approximately half of the input would be less than that element and half would be greater or equal. -func ApproximateWeightedQuantiles(s beam.Scope, pc beam.PCollection, less interface{}, opts Opts) beam.PCollection { +func ApproximateWeightedQuantiles(s beam.Scope, pc beam.PCollection, less any, opts Opts) beam.PCollection { _, t := beam.ValidateKVType(pc) state := approximateQuantilesCombineFnState{ K: opts.K, diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go index e1f6b231b42..c03620d0b9b 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go @@ -31,7 +31,7 @@ func init() { // In practice, this runs faster than plain reflection. // TODO(https://github.com/apache/beam/issues/20271): Remove once collisions don't occur for starcgen over test code and an equivalent is generated for us. - reflectx.RegisterFunc(reflect.ValueOf(less).Type(), func(_ interface{}) reflectx.Func { + reflectx.RegisterFunc(reflect.ValueOf(less).Type(), func(_ any) reflectx.Func { return newIntLess() }) } @@ -54,8 +54,8 @@ func (i *intLess) Name() string { func (i *intLess) Type() reflect.Type { return i.t } -func (i *intLess) Call(args []interface{}) []interface{} { - return []interface{}{args[0].(int) < args[1].(int)} +func (i *intLess) Call(args []any) []any { + return []any{args[0].(int) < args[1].(int)} } func less(a, b int) bool { diff --git a/sdks/go/pkg/beam/transforms/stats/sum_switch.go b/sdks/go/pkg/beam/transforms/stats/sum_switch.go index cfea44a45d2..60bff369ae1 100644 --- a/sdks/go/pkg/beam/transforms/stats/sum_switch.go +++ b/sdks/go/pkg/beam/transforms/stats/sum_switch.go @@ -22,7 +22,7 @@ import ( "reflect" ) -func findSumFn(t reflect.Type) interface{} { +func findSumFn(t reflect.Type) any { switch t.String() { case "int": return sumIntFn diff --git a/sdks/go/pkg/beam/transforms/stats/sum_switch.tmpl b/sdks/go/pkg/beam/transforms/stats/sum_switch.tmpl index e431318a39d..7bb764a7f42 100644 --- a/sdks/go/pkg/beam/transforms/stats/sum_switch.tmpl +++ b/sdks/go/pkg/beam/transforms/stats/sum_switch.tmpl @@ -20,7 +20,7 @@ import ( "reflect" ) -func findSumFn(t reflect.Type) interface{} { +func findSumFn(t reflect.Type) any { switch t.String() { {{- range .X}} case "{{.Type}}": diff --git a/sdks/go/pkg/beam/transforms/stats/util.go b/sdks/go/pkg/beam/transforms/stats/util.go index c1bb8974c56..ff65375d681 100644 --- a/sdks/go/pkg/beam/transforms/stats/util.go +++ b/sdks/go/pkg/beam/transforms/stats/util.go @@ -32,7 +32,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" ) -func combine(s beam.Scope, makeCombineFn func(reflect.Type) interface{}, col beam.PCollection) beam.PCollection { +func combine(s beam.Scope, makeCombineFn func(reflect.Type) any, col beam.PCollection) beam.PCollection { t := beam.ValidateNonCompositeType(col) validateNonComplexNumber(t.Type()) @@ -41,7 +41,7 @@ func combine(s beam.Scope, makeCombineFn func(reflect.Type) interface{}, col bea return beam.Combine(s, makeCombineFn(t.Type()), col) } -func combinePerKey(s beam.Scope, makeCombineFn func(reflect.Type) interface{}, col beam.PCollection) beam.PCollection { +func combinePerKey(s beam.Scope, makeCombineFn func(reflect.Type) any, col beam.PCollection) beam.PCollection { _, t := beam.ValidateKVType(col) validateNonComplexNumber(t.Type()) diff --git a/sdks/go/pkg/beam/transforms/top/top.go b/sdks/go/pkg/beam/transforms/top/top.go index 38fcc3bba3a..f93786cd229 100644 --- a/sdks/go/pkg/beam/transforms/top/top.go +++ b/sdks/go/pkg/beam/transforms/top/top.go @@ -51,7 +51,7 @@ var ( // // col := beam.Create(s, 1, 11, 7, 5, 10) // top2 := stats.Largest(s, col, 2, less) // PCollection<[]int> with [11, 10] as the only element. -func Largest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.PCollection { +func Largest(s beam.Scope, col beam.PCollection, n int, less any) beam.PCollection { s = s.Scope(fmt.Sprintf("top.Largest(%v)", n)) t := beam.ValidateNonCompositeType(col) @@ -63,7 +63,7 @@ func Largest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.P // LargestPerKey returns the largest N values for each key of a PCollection<KV<K,T>>. // The order is defined by the comparator, less : T x T -> bool. It returns a // PCollection<KV<K,[]T>> with a slice of the N largest elements for each key. -func LargestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.PCollection { +func LargestPerKey(s beam.Scope, col beam.PCollection, n int, less any) beam.PCollection { s = s.Scope(fmt.Sprintf("top.LargestPerKey(%v)", n)) _, t := beam.ValidateKVType(col) @@ -80,7 +80,7 @@ func LargestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{}) // // col := beam.Create(s, 1, 11, 7, 5, 10) // bottom2 := stats.Smallest(s, col, 2, less) // PCollection<[]int> with [1, 5] as the only element. -func Smallest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.PCollection { +func Smallest(s beam.Scope, col beam.PCollection, n int, less any) beam.PCollection { s = s.Scope(fmt.Sprintf("top.Smallest(%v)", n)) t := beam.ValidateNonCompositeType(col) @@ -92,7 +92,7 @@ func Smallest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam. // SmallestPerKey returns the smallest N values for each key of a PCollection<KV<K,T>>. // The order is defined by the comparator, less : T x T -> bool. It returns a // PCollection<KV<K,[]T>> with a slice of the N smallest elements for each key. -func SmallestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.PCollection { +func SmallestPerKey(s beam.Scope, col beam.PCollection, n int, less any) beam.PCollection { s = s.Scope(fmt.Sprintf("top.SmallestPerKey(%v)", n)) _, t := beam.ValidateKVType(col) @@ -101,14 +101,14 @@ func SmallestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{}) return beam.CombinePerKey(s, newCombineFn(less, n, t.Type(), true), col) } -func validate(t typex.FullType, n int, less interface{}) { +func validate(t typex.FullType, n int, less any) { if n < 1 { panic("n must be > 0") } funcx.MustSatisfy(less, funcx.Replace(sig, beam.TType, t.Type())) } -func newCombineFn(less interface{}, n int, t reflect.Type, reversed bool) *combineFn { +func newCombineFn(less any, n int, t reflect.Type, reversed bool) *combineFn { fn := &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Type: beam.EncodedType{T: t}, Reversed: reversed} // Running SetupFn at pipeline construction helps validate the // combineFn, and simplify testing. @@ -124,7 +124,7 @@ type accum struct { data [][]byte // list stores the elements of type A in order. It has at most size N. - list []interface{} + list []any } func (a *accum) unmarshal() error { @@ -258,7 +258,7 @@ func (f *combineFn) ExtractOutput(a accum) []beam.T { return ret } -func (f *combineFn) trim(ret []interface{}) accum { +func (f *combineFn) trim(ret []any) accum { if f.less == nil { f.less = reflectx.ToFunc2x1(f.Less.Fn) } diff --git a/sdks/go/pkg/beam/transforms/xlang/python/external.go b/sdks/go/pkg/beam/transforms/xlang/python/external.go index 2c7c529b143..a5a01d52aab 100644 --- a/sdks/go/pkg/beam/transforms/xlang/python/external.go +++ b/sdks/go/pkg/beam/transforms/xlang/python/external.go @@ -87,24 +87,24 @@ func (p *callableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, } // BuildEncoder encodes the PythonCallableSource logical type -func (p *callableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { +func (p *callableSourceProvider) BuildEncoder(rt reflect.Type) (func(any, io.Writer) error, error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } - return func(iface interface{}, w io.Writer) error { + return func(iface any, w io.Writer) error { v := iface.(CallableSource) return coder.EncodeStringUTF8(string(v), w) }, nil } // BuildDecoder decodes the PythonCallableSource logical type -func (p *callableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { +func (p *callableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (any, error), error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } - return func(r io.Reader) (interface{}, error) { + return func(r io.Reader) (any, error) { s, err := coder.DecodeStringUTF8(r) if err != nil { return nil, err diff --git a/sdks/go/pkg/beam/util.go b/sdks/go/pkg/beam/util.go index 3616ef2ee0d..d591dedd762 100644 --- a/sdks/go/pkg/beam/util.go +++ b/sdks/go/pkg/beam/util.go @@ -36,7 +36,7 @@ func NewPipelineWithRoot() (*Pipeline, Scope) { // Seq is a convenience helper to chain single-input/single-output ParDos together // in a sequence. -func Seq(s Scope, col PCollection, dofns ...interface{}) PCollection { +func Seq(s Scope, col PCollection, dofns ...any) PCollection { cur := col for _, dofn := range dofns { cur = ParDo(s, dofn, cur) diff --git a/sdks/go/pkg/beam/xlang.go b/sdks/go/pkg/beam/xlang.go index 1fe26fc714a..a418bcbd715 100644 --- a/sdks/go/pkg/beam/xlang.go +++ b/sdks/go/pkg/beam/xlang.go @@ -63,7 +63,7 @@ func UnnamedOutputTag() string { // Data string // } // encodedPl := beam.CrossLanguagePayload(stringPayload{Data: "foo"}) -func CrossLanguagePayload(pl interface{}) []byte { +func CrossLanguagePayload(pl any) []byte { bytes, err := xlangx.EncodeStructPayload(pl) if err != nil { panic(err) diff --git a/sdks/go/test/integration/flags.go b/sdks/go/test/integration/flags.go index 04dc66f4739..75e2fdcb10c 100644 --- a/sdks/go/test/integration/flags.go +++ b/sdks/go/test/integration/flags.go @@ -127,6 +127,6 @@ func (s *stringSlice) Set(value string) error { } // Get returns the instance itself. -func (s stringSlice) Get() interface{} { +func (s stringSlice) Get() any { return s } diff --git a/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go b/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go index ca60a5b8677..884d7662da7 100644 --- a/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go +++ b/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go @@ -18,7 +18,6 @@ package bigquery import ( "flag" "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "log" "math/rand" "reflect" @@ -26,6 +25,8 @@ import ( "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" @@ -116,7 +117,7 @@ func (fn *CreateTestRowsFn) ProcessElement(_ []byte, emit func(TestRow)) { } // WritePipeline creates a pipeline that writes elements created by createFn into a BigQuery table. -func WritePipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline { +func WritePipeline(expansionAddr, table string, createFn any) *beam.Pipeline { p := beam.NewPipeline() s := p.Root() @@ -131,7 +132,7 @@ func WritePipeline(expansionAddr, table string, createFn interface{}) *beam.Pipe // ReadPipeline creates a pipeline that reads elements directly from a BigQuery table and asserts // that they match elements created by createFn. -func ReadPipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline { +func ReadPipeline(expansionAddr, table string, createFn any) *beam.Pipeline { p := beam.NewPipeline() s := p.Root() @@ -180,7 +181,7 @@ func castFn(elm TestRowPtrs) TestRow { // ReadPipeline creates a pipeline that reads elements from a BigQuery table via a SQL Query, and // asserts that they match elements created by createFn. -func ReadFromQueryPipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline { +func ReadFromQueryPipeline(expansionAddr, table string, createFn any) *beam.Pipeline { p := beam.NewPipeline() s := p.Root() diff --git a/sdks/go/test/integration/primitives/windowinto.go b/sdks/go/test/integration/primitives/windowinto.go index e270175ec1c..d33e464b76f 100644 --- a/sdks/go/test/integration/primitives/windowinto.go +++ b/sdks/go/test/integration/primitives/windowinto.go @@ -57,7 +57,7 @@ func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam. windowSize := 3 * time.Second - validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...interface{}) { + validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...any) { // Window the data. windowed := beam.WindowInto(s, wfn, in) // Perform the appropriate sum operation. @@ -107,7 +107,7 @@ func ValidateWindowedSideInputs(s beam.Scope) { windowSize := 1 * time.Second - validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) { + validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...any) { wData := beam.WindowInto(s, wfn, in) wSide := beam.WindowInto(s, sideFn, side) @@ -149,7 +149,7 @@ func sumSideInputs(input int, iter func(*int) bool, emit func(int)) { emit(sum) } -func validateEquals(s beam.Scope, wfn *window.Fn, in beam.PCollection, opts []beam.WindowIntoOption, expected ...interface{}) { +func validateEquals(s beam.Scope, wfn *window.Fn, in beam.PCollection, opts []beam.WindowIntoOption, expected ...any) { windowed := beam.WindowInto(s, wfn, in, opts...) sums := stats.Sum(s, windowed) sums = beam.WindowInto(s, window.NewGlobalWindows(), sums) diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go index 5fddc6226dd..2566a88cbb2 100644 --- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go +++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go @@ -65,8 +65,8 @@ type Coder struct { } type logger interface { - Errorf(string, ...interface{}) - Logf(string, ...interface{}) + Errorf(string, ...any) + Logf(string, ...any) } // Spec is a set of conditions that a coder must pass. @@ -180,7 +180,7 @@ var cmpOpts = []cmp.Option{ } func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool { - var got, want interface{} + var got, want any switch c.Urn { case "beam:coder:bytes:v1": got = string(elem.Elm.([]byte)) @@ -254,7 +254,7 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool { case "beam:coder:interval_window:v1": var a, b int val := eg.Value - if is, ok := eg.Value.([]interface{}); ok { + if is, ok := eg.Value.([]any); ok { val = is[0] } v := val.(yaml.MapSlice) @@ -340,7 +340,7 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool { pass = false } case "windows": - if v, ok := item.Value.([]interface{}); ok { + if v, ok := item.Value.([]any); ok { for i, val := range v { if val.(string) == "global" && fmt.Sprintf("%s", tm.Windows[i]) == "[*]" { continue @@ -376,7 +376,7 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool { return true } -func diffPane(eg interface{}, got typex.PaneInfo) bool { +func diffPane(eg any, got typex.PaneInfo) bool { pass := true paneTiming := map[typex.PaneTiming]string{ typex.PaneUnknown: "UNKNOWN", @@ -423,7 +423,7 @@ var nameToType = map[string]reflect.Type{ "f_float": reflectx.Float32, } -func setField(rv reflect.Value, i int, v interface{}) { +func setField(rv reflect.Value, i int, v any) { if v == nil { return } @@ -455,9 +455,9 @@ func setField(rv reflect.Value, i int, v interface{}) { rf.Set(reflect.ValueOf([]byte(v.(string)))) break } - // Value is a []interface{} with string values. + // Value is a []any with string values. var arr []string - for _, a := range v.([]interface{}) { + for _, a := range v.([]any) { arr = append(arr, a.(string)) } rf.Set(reflect.ValueOf(arr)) @@ -499,11 +499,11 @@ func (s *Spec) parseCoder(c Coder) string { // Simple logger to run as main program. type logLogger struct{} -func (*logLogger) Errorf(format string, v ...interface{}) { +func (*logLogger) Errorf(format string, v ...any) { log.Printf(format, v...) } -func (*logLogger) Logf(format string, v ...interface{}) { +func (*logLogger) Logf(format string, v ...any) { log.Printf(format, v...) } diff --git a/sdks/go/test/regression/pardo.go b/sdks/go/test/regression/pardo.go index 7ca3880a0e6..4b8fba7f9dd 100644 --- a/sdks/go/test/regression/pardo.go +++ b/sdks/go/test/regression/pardo.go @@ -94,7 +94,7 @@ func directCountFn(_ int, values func(*int) bool) (int, error) { // DirectParDoAfterGBK generates a pipeline with a direct-form // ParDo after a GBK. See: BEAM-3978 and BEAM-4175. func DirectParDoAfterGBK() *beam.Pipeline { - p, s, col := ptest.Create([]interface{}{1, 2, 3, 4}) + p, s, col := ptest.Create([]any{1, 2, 3, 4}) keyed := beam.GroupByKey(s, beam.AddFixedKey(s, col)) sum := beam.ParDo(s, directCountFn, keyed) @@ -116,7 +116,7 @@ func emitCountFn(_ int, values func(*int) bool, emit func(int)) error { // EmitParDoAfterGBK generates a pipeline with a emit-form // ParDo after a GBK. See: BEAM-3978 and BEAM-4175. func EmitParDoAfterGBK() *beam.Pipeline { - p, s, col := ptest.Create([]interface{}{1, 2, 3, 4}) + p, s, col := ptest.Create([]any{1, 2, 3, 4}) keyed := beam.GroupByKey(s, beam.AddFixedKey(s, col)) sum := beam.ParDo(s, emitCountFn, keyed)