Buckle up, it's time to dive into Side Inputs along with Go SDK code links!
First and foremost, all the protos that cover side inputs (namely beam_runner_api.proto [1], and beam_fn_api.proto [2], which nominally handle construction time, and execution time concerns respectively) refer to the design doc at https://s.apache.org/beam-fn-state-api-and-bundle-processing [3] when it comes to details around side inputs and the state API. I'm going to cover side inputs exclusively, from User side, to construction, to execution. Side Inputs are window scoped data outside of the parallel input that can be supplied through the runner. From the user side, this allows a pipeline to use any PCollection as additional (side) input to a DoFn, while also handling whether that side input is in the same window as the parallel input. It's better described in the programming guide section on Side Inputs [4], so I won't repeat their utility here. On the execution side, the data gets to the SDK side worker using the State Channel and API (described in [3]) At pipeline construction time, an SDK will convert it's own internal constructs into the Beam Pipeline protos, and in the Go SDK, that happens in graphx/translate.go. The SDK translates DoFns into ParDoPayloads, populating a map of SideInput protos [5][6]. But before any of that, the user needs to be able to specify side inputs are to be used at all. So how does a user do that in Go? *# Part 1: Decomposition* In the Go SDK, because at time of design, Generics didn't exist (not till Go 1.18 at soonest), we heavily use the original generic concept: Functions with typed parameters. If you squint, funcs can be viewed as generic (apologies to the PL purists in the crowd). See the SDKs RFC [7] for more. Throughout this explanation I'll be using loose Go syntax to represent some concepts. This is not real Go code, and these symbols are arbitrary types, rather than concrete types. After the User hands us a DoFn, we decompose it's lifecycle methods (like ProcessElement) to see if it's well formed, and what inputs it accepts and outputs it emits. So a DoFn with it's ProcessElement method can specify a wide variety of inputs. The designers ended up decreeing that we'll just specify an order to the parameters (instead of explicit tags, like Java does), and that will determine a parameter's role in a ParDo's graph representation. Roughly, as a pseudo-regular expression, for a ProcessElement it's defined as follows : func(FnValue, SideInput*, FnEmit*) Where FnValue means any user element in a PCollection, SideInput can be one of FnValue, FnIter, FnReIter, and FnEmit represents outputs from this DoFn. FnIter, and FnReIter represent Beam Iterables, used either for SideInputs, or for GBK values (mentioned in my last email). This is a dramatic simplification of the more complete expression for any Lifecycle method (seen at [8]) which has a more complete ordering. Order is important, as we don't have much else to go on, with the first FnValue is always the parallel input. Those Fn* terms are also defined in that same file. This is how the SDK understands functions and parameters, though decomposition. In the Go SDK, Side Inputs can be an iterable (represented by a `func(*V) bool` type), a ReIterable (represented by a `func() func(*V) bool` type), or a Singleton (represented by the type, V). We use functions to handle iterables, because of the lack of generics. Similarly, we can and should handle functional representations of Map access to that data (the subject of this current discussion, BEAM-3293 <https://issues.apache.org/jira/browse/BEAM-3293>). To see what a given user type parameter represents, we have helper functions [9] to make that determination, called in a state machine [10]. To handle map functions for BEAM-3293, new functions, representation kinds, and cases would need to be added there. A wrinkle in this is that Side inputs can be a singleton value (a PCollection with 1 element), so the SDK can't statically know what a DoFn's parameters mean without the context of it's input PCollections. Depending on the input PCollection, a func(K,V) that is decomposed to func(FnValue, FnValue) could either be a DoFn that processes a PCollection<K> with a singleton side input PCollection<V>, or a DoFn that processes a PCollection<KV<K,V>>. For that we need to bind the inputs to the DoFn parameters. *# Part 2: Binding* Now that the SDK understands the shape of the user DoFn, it needs to understand how it connects to the graph. This is done through binding the input PCollections to the DoFns parameters. Pertinent to this discussion is [11] where side inputs kinds are bound to specific types. This would need to handle and produce the new map kinds. SideInput Maps are simpler, since we know they *must* be associated with a PCollection<KV<K, V>>. If the Nth SideInput PCollection doesn't match with the Nth side input parameter, then an error should occur at binding time. Binding errors are tricky since the mistake is definitely at the specific DoFn, but all we know is there's a mismatch between what the DoFn requires (determined by the earlier Decomposition), and what was provided by pipeline construction (via beam.SideInput options). But if the shapes match, and the Go types match, we're golden, and can move onto proto translation. This should mostly be handled, but worth looking into specifically when adding a new side input kind. *# Part 3: Translation to Protos* Now that the user half of the SDK understands Map function parameter kinds as side inputs, we need to translate them to the proto pipeline graph. Like most translations to the beam protos, this happens in graphx/translate.go at [6]. This is arguably the critical bit, as it defines how the runner is supposed to serve values. How are iterable side inputs currently handled? Reading down from [6], when side inputs are present, the Go SDK will add an additional Transform that converts the input PCollection<Foo> into a PCollection<KV<K, Foo>> where the key is always the empty string (""), this is represented by the Go SDK specific URNIterableSideInputKey URN. All such SDK specific URN are prefixed by "beam:go" to avoid mixing them up with actual URNs defined by the Beam protos. This URN will be detected at execution time. Most of that case statement is to set up that additional Fixed Key Transform in the graph. The local side input key (i#), is mapped to pull it's values from the keyed input (referring to the global key), along with including the side input in the map with that same local key. *Aside:* It appears the Go SDK presently ignores concepts like the ViewFn and WindowMappingFn, one or both of which might allow the SDK to avoid adding in the additional keying PTransform as a PTransform. This is Technical Debt, as this is probably not how things are done in the portable Python and Java SDKs. I'm uncertain how much this may need to change to enable Map views, but I suspect not at all at this time. The short version for random access side input maps is, we're saying "we're passing a PCollection<KV<K,V>> and would like you to be able to spit the values back out at us on demand". This means all were' putting into the Map or MultiMap case at line 324 below (co-opted for the functional interpretations, rather than as single map[K]V values) is the section starting at line 312, populating the side input map [12]. *# Part 4: Execution from Protos* OK, so, at this point, the user has constructed a pipeline, and started a job with a runner, which in turn spun up workers. Those workers include an SDK side harness that live largely in the Go SDK exec and harness packages. It connects to the FnAPI services on the runner side, and (loosely) starts to receive ProcessBundleDescriptors which describe subgraphs to process and their associate data. Focusing still on side inputs.. There are two things to look into to understand what's going on on the execution side (mostly handled in exec/translate.go [13]),that URN for Iterable Side inputs, and preparing to read side inputs. First is how we use that URNIterableSideInputKey urn. That's done at [14], where all it does is create a special FixedKey execution node [15]. This is a special node that turns PCollection<V> into PCollection<KV<K, V>>. This includes if that original V is actually a KV itself, like KV<K2, V2>, meaning the output would have nested KVs: PCollection<KV<K, KV<K2, V2>>>. Regardless, in most cases, the next node would be a datasink, which then encodes the values and outputs them to the runner. This is happening to the PCollection being used as a side input. In particular, all values are essentially wrapped as the value of a KV, and associated with a fixed key, in this case the empty string (""). That key is important, since it's the key for the state APIs MultiMap side input for when Iterables are read. If we didn't associate everything with a single key, the Runner would allow random access with given key requests, which is what we're implementing now. Fancy that :D. On the side of the ParDo that's going to be reading the Side input, it receives a variety of data that it can use to query the State API, and process responses from it, such as the port to query against, id for this set of state, the receiving transform requesting the data, and the window coder and element coders. [16] These are wrapped into a side input adapter [17] and passed to the ParDo for actual execution [18] *Aside:* How about that, a comment about how we aren't using view_fn and window_mapping_fn... Likely critical for using side inputs with triggers and non-Global windows... No matter for now. *# Part 5: Reading the Side Input* Elsewhere the harness has received the configuration to start executing the plan. Data is about to flow through the Data channel, and provide our DoFn with primary input elements. But what about side inputs? Before each primary input element, we do some initialization to make sure that we have the right data to pass along along with it. Notionally this is a bit of light caching to ensure that the element we're executing shares a window with the cached data, but if it's not yet initialized or it's not, then we need to rebuild it from scratch. This is in ParDo.initSideInput [19]. Each side input configuration was wrapped into an adapter that knows how to produce iterables or whatever else it needs, which is happening here. This will most likely need to change to handle maps as well, as this was built with the assumption of only having iterables. This work changes this assumption. The Go SDK abstracts streams of data from wherever they're coming from with ReStreams and Streams [20], and ReusableInputs [21]. The streams are how decoded data is provided or reset, while the ReusableInputs are for rewinding those without necessarily making new runner requests. This likely won't need to change entirely with maps, but at present, we haven't made it simple to provide a new key for querying. How do we use user side keys for querying though? The SideInputAdapter NewIterable demonstrates how: We take the key, encode it, and the window, and request the element stream from the StateReader [22]. The StateReader abstracts actually calling the State service for us and its OpenSideInput method should continue to serve that purpose for us. It is implemented in harness/statemgr.go [23]. The StateReader is also where most of the protocol in the state API doc [3] is implemented. With a simple iterable (say func(*V) bool), the user passes an allocated v pointer to the iterable, and then something happens, and if it's successful, the function returns true, indicating the value is ready for use. Same if it were a func(*K,*V) bool. These functions are generated via reflection in exec/inputs.go [24] or code generation before compile time. We use this approach because generics do not exist, so we had no alternative. Until they do (next year sometime), map side inputs will need to do the same. We need reflection or code generation so we can have the actual type used by the user back in part 1, otherwise invoking the DoFn will fail at runtime. See the Design RFC [7] for the rationale. Lets say we're using a random access function, with function types like func(K) func(*V) bool, we need a struct to handle it's own state (the references to streams and such) and have a method that can pretend to be that type. For iterators that type is iterValue [25], and it's invoke method [26].The invoke method matches the signature for the reflect.MakeFunc method, which can then produce a value with the right function type. You can see makeIter [27] for how that call happens. So say we have a mapValue type, with an invoke method, it would need to take in it's key argument, and pass that to some ReStream object. ReStream is an interface type, so we can contrive things so that we know it's concrete type, and type assert it to it (say v.s.(mapReStream)), at which point we pass it the key. Then we receive the stream of values as normal, which can then be handled by an instance of iter. Note, I've changed the user type to `func(K) func(*V) bool` because at this point it can and could be a multimap, which would require it's own iteration. We can always add additional variations if we like, but it certainly complicates part 1 and part 5. At this point, once we can iterate through multimaps directly, I'd be willing to wait for generics where most of the complexity can be simplified into generic parent structs (like beam.KV, or beam.Iterator, or beam.ReIterator or beam.MultiMap...). This is far from fully complete, but it this should be able to get you started I think? Cheers, Robert Burke Who can't quite write these emails in his sleep, but did know everywhere to look in the Go SDK. [1] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto [2] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/fn-execution/src/main/proto/beam_fn_api.proto [3] https://s.apache.org/beam-fn-state-api-and-bundle-processing [4] https://beam.apache.org/documentation/programming-guide/#side-inputs [5] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L1186 [6] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L278 [7] https://s.apache.org/beam-go-sdk-design-rfc [8] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L379 [9] hhttps:// github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/sideinput.go#L32 [10] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L318 [11] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/graph/bind.go#L179 [12] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L312 [13] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go [14] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L464 [15] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L108 [16] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402 [17] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L48 [18] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L38 [19] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L254 [20] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L57 [21] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L36 [22] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L59 [23] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/harness/statemgr.go [24] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107 [25] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L98 [26] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L153 [27] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107 On Mon, Mar 29, 2021 at 5:38 PM Ahmet Altay <[email protected]> wrote: > Adding some folks who might be able to help: @Robert Burke > <[email protected]> @Kenneth Knowles <[email protected]> @Tyson Hamilton > <[email protected]> > > On Mon, Mar 29, 2021 at 2:31 PM Miguel Anzo Palomo < > [email protected]> wrote: > >> Hi, >> I was checking out this task BEAM-3293 >> <https://issues.apache.org/jira/browse/BEAM-3293> and I'm having some >> issues fully understanding the idea of how side inputs work internally. Is >> there any resource or specific example that can help to better understand >> how they work and why/where the lazy map implementation would help so I can >> get a better grasp of the task? >> >> Thanks >> >> -- >> >> Miguel Angel Anzo Palomo | WIZELINE >> >> Software Engineer >> >> [email protected] >> >> Remote Office >> >> >> >> >> >> >> >> >> *This email and its contents (including any attachments) are being sent >> toyou on the condition of confidentiality and may be protected by >> legalprivilege. Access to this email by anyone other than the intended >> recipientis unauthorized. If you are not the intended recipient, please >> immediatelynotify the sender by replying to this message and delete the >> materialimmediately from your system. Any further use, dissemination, >> distributionor reproduction of this email is strictly prohibited. Further, >> norepresentation is made with respect to any content contained in this >> email.* > >
