lostluck commented on code in PR #25808: URL: https://github.com/apache/beam/pull/25808#discussion_r1142383516
########## sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "context" + "flag" + "strings" + "time" + + "cloud.google.com/go/pubsub" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx" +) + +func init() { + register.Function4x0(update) + register.Function4x0(process) + register.Emitter2[int, string]() + register.Iter1[string]() +} + +// update simulates an external call to get data for the side input. +func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) { + log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339)) + + // zero is the key used in beam.AddFixedKey which will be applied on the main input. + id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339) + + emit(id, externalData) +} + +// process simulates processing of main input. It reads side input by key +func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) { + log.Infof(ctx, "Processing (key:%d,value:%q)", k, v) + + iter := side(k) + + var externalData []string + var externalDatum string + for iter(&externalDatum) { + externalData = append(externalData, externalDatum) + } + + log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ",")) +} + +func fatalf(err error, format string, args ...interface{}) { + if err != nil { + log.Fatalf(context.TODO(), format, args...) + } +} + +func main() { + var inputTopic, periodicSequenceStart, periodicSequenceEnd string + var periodicSequenceInterval time.Duration + + now := time.Now() + + flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339), + "The time at which to start the periodic sequence.") + + flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339), + "The time at which to end the periodic sequence.") + + flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute, + "The interval between periodic sequence output.") + + flag.StringVar(&inputTopic, "input_topic", "input", + "The PubSub topic from which to read the main input data.") + + flag.Parse() + beam.Init() + ctx := context.Background() + p, s := beam.NewPipelineWithRoot() + + project := gcpopts.GetProject(ctx) + client, err := pubsub.NewClient(ctx, project) + fatalf(err, "Failed to create client: %v", err) + _, err = pubsubx.EnsureTopic(ctx, client, inputTopic) + fatalf(err, "Failed to ensure topic: %v", err) + + mainInput := beam.WindowInto( + s, + window.NewFixedWindows(periodicSequenceInterval), + beam.AddFixedKey( // simulate keyed data by adding a fixed key + s, + pubsubio.Read( + s, + project, + inputTopic, + nil, + ), + ), + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart) + endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd) + sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), + beam.ParDo( + s, + update, + periodic.Impulse( + s, + startTime, + endTime, + periodicSequenceInterval, + ), + ), + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) Review Comment: WRT the cache: Beam's execution abstraction, the FnAPI, has provisions for caching data cross bundle from the StateAPI on the SDK side, in order to avoid repeated deserialization, and additional round trips to the primary store of the runner to fetch state data. Side inputs also come across the StateAPI. In particular, very valuable for streaming jobs, as typically a single "SDK harness" is usually responsible for the same key all the time, so for tight windows it would look up the same data from the side inputs. --- WRT not applying after the impulse change. Makes sense since an int64 was being received from the "update" function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org