lostluck commented on code in PR #25808:
URL: https://github.com/apache/beam/pull/25808#discussion_r1141423750


##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main

Review Comment:
   It's important to have a blank line after the license, and before the 
package documentation so the license header isn't the documentation.  
   
   I've added a package comment, which should conform to the style guide: 
https://google.github.io/styleguide/go/decisions#package-comments
   
   ```suggestion
   // limitations under the License.
   
   // slowly_updating_side_input is an example pipeline demonstrating the 
pattern described
   // at https://beam.apache.org/documentation/patterns/side-inputs/.
   package main
   ```



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+       "context"
+       "flag"
+       "strings"
+       "time"
+
+       "cloud.google.com/go/pubsub"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+       register.Function4x0(update)
+       register.Function4x0(process)
+       register.Emitter2[int, string]()
+       register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, 
string)) {
+       log.Infof(ctx, "Making external call %d at %s", i, 
t.ToTime().Format(time.RFC3339))
+
+       // zero is the key used in beam.AddFixedKey which will be applied on 
the main input.
+       id, externalData := 0, "some fake data that changed at 
"+time.Now().Format(time.RFC3339)
+
+       emit(id, externalData)
+}
+
+// process simulates processing of main input. It reads side input by key
+func process(ctx context.Context, k int, v []byte, side func(int) 
func(*string) bool) {
+       log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)
+
+       iter := side(k)
+
+       var externalData []string
+       var externalDatum string
+       for iter(&externalDatum) {
+               externalData = append(externalData, externalDatum)
+       }
+
+       log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, 
v, strings.Join(externalData, ","))
+}
+
+func fatalf(err error, format string, args ...interface{}) {
+       if err != nil {
+               log.Fatalf(context.TODO(), format, args...)
+       }
+}
+
+func main() {
+       var inputTopic, periodicSequenceStart, periodicSequenceEnd string
+       var periodicSequenceInterval time.Duration
+
+       now := time.Now()
+
+       flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", 
now.Add(-1*time.Hour).Format(time.RFC3339),
+               "The time at which to start the periodic sequence.")
+
+       flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", 
now.Add(100*time.Hour).Format(time.RFC3339),
+               "The time at which to end the periodic sequence.")
+
+       flag.DurationVar(&periodicSequenceInterval, 
"periodic_sequence_interval", 1*time.Minute,
+               "The interval between periodic sequence output.")
+
+       flag.StringVar(&inputTopic, "input_topic", "input",
+               "The PubSub topic from which to read the main input data.")

Review Comment:
   A little non-standard, but I appreciate how this cleans up and ensures 
consistency in the default times values.



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+       "context"
+       "flag"
+       "strings"
+       "time"
+
+       "cloud.google.com/go/pubsub"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+       register.Function4x0(update)
+       register.Function4x0(process)
+       register.Emitter2[int, string]()
+       register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, 
string)) {
+       log.Infof(ctx, "Making external call %d at %s", i, 
t.ToTime().Format(time.RFC3339))
+
+       // zero is the key used in beam.AddFixedKey which will be applied on 
the main input.
+       id, externalData := 0, "some fake data that changed at 
"+time.Now().Format(time.RFC3339)
+
+       emit(id, externalData)
+}
+
+// process simulates processing of main input. It reads side input by key
+func process(ctx context.Context, k int, v []byte, side func(int) 
func(*string) bool) {
+       log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)
+
+       iter := side(k)
+
+       var externalData []string
+       var externalDatum string
+       for iter(&externalDatum) {
+               externalData = append(externalData, externalDatum)
+       }
+
+       log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, 
v, strings.Join(externalData, ","))
+}
+
+func fatalf(err error, format string, args ...interface{}) {
+       if err != nil {
+               log.Fatalf(context.TODO(), format, args...)
+       }
+}
+
+func main() {
+       var inputTopic, periodicSequenceStart, periodicSequenceEnd string
+       var periodicSequenceInterval time.Duration
+
+       now := time.Now()
+
+       flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", 
now.Add(-1*time.Hour).Format(time.RFC3339),
+               "The time at which to start the periodic sequence.")
+
+       flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", 
now.Add(100*time.Hour).Format(time.RFC3339),
+               "The time at which to end the periodic sequence.")
+
+       flag.DurationVar(&periodicSequenceInterval, 
"periodic_sequence_interval", 1*time.Minute,
+               "The interval between periodic sequence output.")
+
+       flag.StringVar(&inputTopic, "input_topic", "input",
+               "The PubSub topic from which to read the main input data.")
+
+       flag.Parse()
+       beam.Init()
+       ctx := context.Background()
+       p, s := beam.NewPipelineWithRoot()
+
+       project := gcpopts.GetProject(ctx)
+       client, err := pubsub.NewClient(ctx, project)
+       fatalf(err, "Failed to create client: %v", err)
+       _, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
+       fatalf(err, "Failed to ensure topic: %v", err)
+
+       mainInput := beam.WindowInto(
+               s,
+               window.NewFixedWindows(periodicSequenceInterval),
+               beam.AddFixedKey( // simulate keyed data by adding a fixed key
+                       s,
+                       pubsubio.Read(
+                               s,
+                               project,
+                               inputTopic,
+                               nil,
+                       ),
+               ),
+               beam.Trigger(trigger.Repeat(trigger.Always())),
+               beam.PanesDiscard(),
+       )
+
+       startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart)
+       endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd)
+       sideInput := beam.WindowInto(s, 
window.NewFixedWindows(periodicSequenceInterval),
+               beam.ParDo(
+                       s,
+                       update,
+                       periodic.Impulse(
+                               s,
+                               startTime,
+                               endTime,
+                               periodicSequenceInterval,
+                       ),
+               ),
+               beam.Trigger(trigger.Repeat(trigger.Always())),
+               beam.PanesDiscard(),
+       )

Review Comment:
   The nesting here, has the added deficit of hiding the core of the example.
   
   ```suggestion
      // Generate an impulse every period.
       periodicImp := periodic.Impulse(s, startTime, endTime, 
periodicSequenceInterval)
       
       // Use the impulse to trigger some other ordinary transform.
       updatedImp := beam.ParDo(s, update, periodicImp)
       
       // Window for use as a side input, to allow the input to change with 
windows.
        sideInput := beam.WindowInto(s, 
window.NewFixedWindows(periodicSequenceInterval),
                updatedImp,
                beam.Trigger(trigger.Repeat(trigger.Always())),
                beam.PanesDiscard(),
        )
   ```
   
   I'll note that the window for the side input is usually going to be larger 
than the window for the main processing. While this isn't wrong, the usualy 
goal around the pattern is a situation like allowing files that change hourly 
get read in once each hour, and have the more frequent data able to re-use the 
cached read in file.  (Granted, this behavior isn't yet enabled by default in 
the Go SDK, but that's an aside).



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time

Review Comment:
   This will work in the direct runner, but that's because the direct runner 
won't successfully run the example or anything local. It would be better to 
fold things into the sequence definition to enable appropriate testing behavior.



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+       "context"
+       "flag"
+       "strings"
+       "time"
+
+       "cloud.google.com/go/pubsub"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+       register.Function4x0(update)
+       register.Function4x0(process)
+       register.Emitter2[int, string]()
+       register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, 
string)) {
+       log.Infof(ctx, "Making external call %d at %s", i, 
t.ToTime().Format(time.RFC3339))
+
+       // zero is the key used in beam.AddFixedKey which will be applied on 
the main input.
+       id, externalData := 0, "some fake data that changed at 
"+time.Now().Format(time.RFC3339)
+
+       emit(id, externalData)
+}
+
+// process simulates processing of main input. It reads side input by key
+func process(ctx context.Context, k int, v []byte, side func(int) 
func(*string) bool) {
+       log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)
+
+       iter := side(k)
+
+       var externalData []string
+       var externalDatum string
+       for iter(&externalDatum) {
+               externalData = append(externalData, externalDatum)
+       }
+
+       log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, 
v, strings.Join(externalData, ","))
+}
+
+func fatalf(err error, format string, args ...interface{}) {
+       if err != nil {
+               log.Fatalf(context.TODO(), format, args...)
+       }
+}
+
+func main() {
+       var inputTopic, periodicSequenceStart, periodicSequenceEnd string
+       var periodicSequenceInterval time.Duration
+
+       now := time.Now()
+
+       flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", 
now.Add(-1*time.Hour).Format(time.RFC3339),
+               "The time at which to start the periodic sequence.")
+
+       flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", 
now.Add(100*time.Hour).Format(time.RFC3339),
+               "The time at which to end the periodic sequence.")
+
+       flag.DurationVar(&periodicSequenceInterval, 
"periodic_sequence_interval", 1*time.Minute,
+               "The interval between periodic sequence output.")
+
+       flag.StringVar(&inputTopic, "input_topic", "input",
+               "The PubSub topic from which to read the main input data.")
+
+       flag.Parse()
+       beam.Init()
+       ctx := context.Background()
+       p, s := beam.NewPipelineWithRoot()
+
+       project := gcpopts.GetProject(ctx)
+       client, err := pubsub.NewClient(ctx, project)
+       fatalf(err, "Failed to create client: %v", err)
+       _, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
+       fatalf(err, "Failed to ensure topic: %v", err)
+
+       mainInput := beam.WindowInto(
+               s,
+               window.NewFixedWindows(periodicSequenceInterval),
+               beam.AddFixedKey( // simulate keyed data by adding a fixed key
+                       s,
+                       pubsubio.Read(
+                               s,
+                               project,
+                               inputTopic,
+                               nil,
+                       ),
+               ),
+               beam.Trigger(trigger.Repeat(trigger.Always())),
+               beam.PanesDiscard(),
+       )

Review Comment:
   Don't do this. This is far less readable than having single use variables, 
which makes the structure of the pipeline explicit, and linear with reading the 
program.
   
   Nesting like this leads to needing to read the code to the middle, then back 
track back up, in order to understand the pipeline, which isn't useful for 
example code, which will be read way more than written.
   
   ```suggestion
       source :=  pubsubio.Read(s, project, inputTopic, nil)
       keyedSource :=   beam.AddFixedKey(s, source) // simulate keyed data by 
adding a fixed key
        mainInput := beam.WindowInto(
                s,
                keyedSource,
                window.NewFixedWindows(periodicSequenceInterval),
                beam.Trigger(trigger.Repeat(trigger.Always())),
                beam.PanesDiscard(),
        )
   ```



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+       "context"
+       "flag"
+       "strings"
+       "time"
+
+       "cloud.google.com/go/pubsub"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+       register.Function4x0(update)
+       register.Function4x0(process)
+       register.Emitter2[int, string]()
+       register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, 
string)) {
+       log.Infof(ctx, "Making external call %d at %s", i, 
t.ToTime().Format(time.RFC3339))

Review Comment:
   It's a small but critical thing that we clarify this is in event time, since 
it might not match processing time, depending on how the source is configured.
   
   ```suggestion
        log.Infof(ctx, "Making external call %d at event time %s", i, 
t.ToTime().Format(time.RFC3339))
   ```



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########


Review Comment:
   We should be able to add a short unit test if using the `prism` runner 
directly. 
   
   While the runner isn't fully complete yet, it *does* run and execute 
ProcessContinuation transforms and watermarks!
   
   It just doesn't do the splitting just yet, or actually "wait" for any 
process continuations at the moment. But when the "sequence" is done, it will 
terminate, so we can add a test with period of a second, a duration of a 
minute, and then count that we're getting 60 elements out of the transform. 
(Small risk of getting 59 instead, as a flake...)



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() 
*sdf.ManualWatermarkEstimator {
+       return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we 
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, 
emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+       currentOutputIndex := 
rt.GetRestriction().(offsetrange.Restriction).Start
+       currentOutputTimestamp := sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+       currentTime := fn.now()
+       we.UpdateWatermark(currentOutputTimestamp)
+       for currentOutputTimestamp.Before(currentTime) {
+               if rt.TryClaim(currentOutputIndex) {

Review Comment:
   Just to explain what happens here and how it works:
   
   The transform will continue until the restriction is fully processed, or the 
runner has told the bundle to split. On that split, tryClaim returns false, 
GetError nil, isDone false, and then we reschedule for later with the 
ResumeProcessingIn continuation. Note that this isn't a hard timer, and a 
runner may be early or late (usually early).
   
   The output watermark is held to the last output timestamp. When Stop 
happens, the watermark can then advance to whereever the source is at (probably 
end of Global Window).



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() 
*sdf.ManualWatermarkEstimator {
+       return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we 
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, 
emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+       currentOutputIndex := 
rt.GetRestriction().(offsetrange.Restriction).Start
+       currentOutputTimestamp := sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+       currentTime := fn.now()
+       we.UpdateWatermark(currentOutputTimestamp)
+       for currentOutputTimestamp.Before(currentTime) {
+               if rt.TryClaim(currentOutputIndex) {
+                       emit(mtime.FromTime(currentOutputTimestamp), 
currentOutputTimestamp.UnixMilli())
+                       currentOutputIndex += 1
+                       currentOutputTimestamp = sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+                       currentTime = fn.now()
+                       we.UpdateWatermark(currentOutputTimestamp)
+               } else if err := rt.GetError(); err != nil || rt.IsDone() {
+                       // Stop processing on error or completion
+                       return sdf.StopProcessing(), rt.GetError()
+               } else {
+                       return sdf.ResumeProcessingIn(sd.Interval), nil
+               }
+       }
+
+       return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+       ApplyWindow bool
+
+       now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {
+       return func(o *impulseConfig) error {
+               o.ApplyWindow = true
+               return nil
+       }
+}
+
+func withNowFunc(now func() time.Time) ImpulseOption {

Review Comment:
   Per previous mention, this can't be serialized so it's non portable (and 
it's complicated to validate in once it's passed through serialization if it's 
hacked around). I'd remove it.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() 
*sdf.ManualWatermarkEstimator {
+       return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we 
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, 
emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+       currentOutputIndex := 
rt.GetRestriction().(offsetrange.Restriction).Start
+       currentOutputTimestamp := sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+       currentTime := fn.now()
+       we.UpdateWatermark(currentOutputTimestamp)
+       for currentOutputTimestamp.Before(currentTime) {
+               if rt.TryClaim(currentOutputIndex) {
+                       emit(mtime.FromTime(currentOutputTimestamp), 
currentOutputTimestamp.UnixMilli())
+                       currentOutputIndex += 1
+                       currentOutputTimestamp = sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+                       currentTime = fn.now()
+                       we.UpdateWatermark(currentOutputTimestamp)
+               } else if err := rt.GetError(); err != nil || rt.IsDone() {
+                       // Stop processing on error or completion
+                       return sdf.StopProcessing(), rt.GetError()
+               } else {
+                       return sdf.ResumeProcessingIn(sd.Interval), nil
+               }
+       }
+
+       return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+       ApplyWindow bool
+
+       now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption

Review Comment:
   Since this is just immeadiately aliased in the same package, the code may as 
well directly export the type. The indirection doesn't add anything but 
possible confusion to readers in switching between the two ways of referring to 
the same type.
   
   Per the style guide: Don't use aliases if they aren't needed 
https://google.github.io/styleguide/go/decisions#type-aliases



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() 
*sdf.ManualWatermarkEstimator {
+       return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we 
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, 
emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+       currentOutputIndex := 
rt.GetRestriction().(offsetrange.Restriction).Start
+       currentOutputTimestamp := sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+       currentTime := fn.now()
+       we.UpdateWatermark(currentOutputTimestamp)
+       for currentOutputTimestamp.Before(currentTime) {
+               if rt.TryClaim(currentOutputIndex) {
+                       emit(mtime.FromTime(currentOutputTimestamp), 
currentOutputTimestamp.UnixMilli())
+                       currentOutputIndex += 1
+                       currentOutputTimestamp = sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+                       currentTime = fn.now()
+                       we.UpdateWatermark(currentOutputTimestamp)
+               } else if err := rt.GetError(); err != nil || rt.IsDone() {
+                       // Stop processing on error or completion
+                       return sdf.StopProcessing(), rt.GetError()
+               } else {
+                       return sdf.ResumeProcessingIn(sd.Interval), nil
+               }
+       }
+
+       return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+       ApplyWindow bool
+
+       now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {

Review Comment:
   I don't expect additional options will be added, so lets match Java and 
Python with this and use a boolean.
   
   Effectively this transform just produces a PCollection, which means any 
expansions can be ordinary DoFns or other composites to add other 
functionality. No need to overcomplicate things here.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() 
*sdf.ManualWatermarkEstimator {
+       return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we 
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, 
emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+       currentOutputIndex := 
rt.GetRestriction().(offsetrange.Restriction).Start
+       currentOutputTimestamp := sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+       currentTime := fn.now()
+       we.UpdateWatermark(currentOutputTimestamp)
+       for currentOutputTimestamp.Before(currentTime) {
+               if rt.TryClaim(currentOutputIndex) {
+                       emit(mtime.FromTime(currentOutputTimestamp), 
currentOutputTimestamp.UnixMilli())
+                       currentOutputIndex += 1
+                       currentOutputTimestamp = sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+                       currentTime = fn.now()
+                       we.UpdateWatermark(currentOutputTimestamp)
+               } else if err := rt.GetError(); err != nil || rt.IsDone() {
+                       // Stop processing on error or completion
+                       return sdf.StopProcessing(), rt.GetError()
+               } else {
+                       return sdf.ResumeProcessingIn(sd.Interval), nil
+               }
+       }
+
+       return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+       ApplyWindow bool
+
+       now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {
+       return func(o *impulseConfig) error {
+               o.ApplyWindow = true
+               return nil
+       }
+}
+
+func withNowFunc(now func() time.Time) ImpulseOption {
+       return func(o *impulseConfig) error {
+               o.now = now
+               return nil
+       }
+}
+
+// Impulse is a PTransform which generates a sequence of timestamped
+// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each
+// element will be assigned to its own fixed window of interval size.
+//
+// The transform behaves the same as [Sequence] transform, but can be
+// used as the first transform in a pipeline.
+//
+// The following applies to the arguments.
+//   - if interval <= 0, interval is set to [math.MaxInt64]
+//   - if start is a zero value [time.Time], start is set to the current time
+//   - if start is after end, start is set to end
+//
+// The PCollection generated by Impulse is unbounded and the output elements
+// are the [time.UnixMilli] int64 values of the output timestamp.
+func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts 
...ImpulseOption) beam.PCollection {
+       if interval <= 0 {
+               interval = math.MaxInt64
+       }
+       if start.IsZero() {
+               start = time.Now()
+       }
+       if start.After(end) {
+               start = end
+       }

Review Comment:
   And mTime.Times are prevented from being serialized. 
   
   I'd use int64s makes it clear, and it's an implementation detail. (and did 
so for a similar, but test specific transform, for testing prism). Reduces 
likely hood of architecture specific issues too since int's width is arch 
dependent.
   
   TBH it's odd to me that the "value" for the periodic impulses are time 
instants, which are the event time anyway. I'd prefer we avoid the needless 
duplication Java and Python take by just having int for Sequence, and []byte{} 
for Impulse. Int is specific and clear enough for sequence (it's a sequence of 
numbers, done), but int64 is likely better for clarity on range across 
architectures. []byte is what the normal Impulse returns, and so it would make 
it more compatible / drop in with existing transforms that consume the []byte, 
even if it's ignored.
   
   We can also just encode the time/timestamp into the []byte, if we so choose, 
but again, it's redundant.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}

Review Comment:
   It should look at the existing restriction and modify start and end 
appropriately, but this might be OK for periodic sequence (especially if it's 
been validated to work as expected on a runner like Google Cloud Dataflow).
   
   Basically the restriction will be immeadiately provided to the 
ProcessElement method, and that method needs to handle the given restriction 
correctly, so it's probably nice to know what the last "state" of the 
restriction is, even if a ProcessContinuation Stop is immeadiately getting 
returned next.
   
   It's a little un-intuitive, but to avoid dataloss errors, drain can be 
thought of as simply advancing all root watermarks immeadiately to 
EndOfGlobalWindow, and allowing the pipeline to terminate processing. The 
TruncateRestrictions are to re-process current restrictions, and shrink them. 
Eg. TruncateRestriction is where an IO could comunicate and write to an 
external system about what hasn't been processed yet, so the next caller of 
that system can pick up where it was left off.  Or a system like Pubsub would 
effectively handles that case by simply not having any of the outstanding 
messages acked, so they'll be delivered to the next time the subscriber reads 
the topic. Different systems, both effective.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, 
*sdf.LockRTracker, SequenceDefinition,
+               func(beam.EventTime, int64),
+               sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+       register.Emitter2[beam.EventTime, int64]()
+       beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+       Interval time.Duration
+       Start    time.Time
+       End      time.Time
+}
+
+type sequenceGenDoFn struct {
+       now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+       if fn.now == nil {
+               fn.now = time.Now
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) 
offsetrange.Restriction {
+       totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+       return offsetrange.Restriction{
+               Start: int64(0),
+               End:   int64(totalOutputs),
+       }
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ 
SequenceDefinition) offsetrange.Restriction {
+       return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() 
*sdf.ManualWatermarkEstimator {
+       return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we 
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, 
emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+       currentOutputIndex := 
rt.GetRestriction().(offsetrange.Restriction).Start
+       currentOutputTimestamp := sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+       currentTime := fn.now()
+       we.UpdateWatermark(currentOutputTimestamp)
+       for currentOutputTimestamp.Before(currentTime) {
+               if rt.TryClaim(currentOutputIndex) {
+                       emit(mtime.FromTime(currentOutputTimestamp), 
currentOutputTimestamp.UnixMilli())
+                       currentOutputIndex += 1
+                       currentOutputTimestamp = sd.Start.Add(sd.Interval * 
time.Duration(currentOutputIndex))
+                       currentTime = fn.now()
+                       we.UpdateWatermark(currentOutputTimestamp)
+               } else if err := rt.GetError(); err != nil || rt.IsDone() {
+                       // Stop processing on error or completion
+                       return sdf.StopProcessing(), rt.GetError()
+               } else {
+                       return sdf.ResumeProcessingIn(sd.Interval), nil
+               }
+       }
+
+       return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+       ApplyWindow bool
+
+       now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {
+       return func(o *impulseConfig) error {
+               o.ApplyWindow = true
+               return nil
+       }
+}
+
+func withNowFunc(now func() time.Time) ImpulseOption {
+       return func(o *impulseConfig) error {
+               o.now = now
+               return nil
+       }
+}
+
+// Impulse is a PTransform which generates a sequence of timestamped
+// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each
+// element will be assigned to its own fixed window of interval size.
+//
+// The transform behaves the same as [Sequence] transform, but can be
+// used as the first transform in a pipeline.
+//
+// The following applies to the arguments.
+//   - if interval <= 0, interval is set to [math.MaxInt64]
+//   - if start is a zero value [time.Time], start is set to the current time
+//   - if start is after end, start is set to end
+//
+// The PCollection generated by Impulse is unbounded and the output elements
+// are the [time.UnixMilli] int64 values of the output timestamp.
+func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts 
...ImpulseOption) beam.PCollection {
+       if interval <= 0 {
+               interval = math.MaxInt64
+       }
+       if start.IsZero() {
+               start = time.Now()
+       }
+       if start.After(end) {
+               start = end
+       }

Review Comment:
   For handling End, we can also just have a variable set to "largest practical 
time", and use that. It's going to be a very rare case where someone wants to 
process a stream of data that extends past the year 9999.  (Which we could work 
around by changing the serialization... but that's not urgent now.) 
   
   We can also pick something like another arbitrary time with a nanosecond 
scale difference as a sentinel value that we check for (eg. Zero + 1 
nanosecond), which we check for and treat as End of Time. Beam doesn't promise 
handling event time granularities under microseconds IIRC (if smaller than 
milliseconds at all...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to