Buckle up, it's time to dive into Side Inputs along with Go SDK code links!

First and foremost, all the protos that cover side inputs (namely
beam_runner_api.proto [1], and beam_fn_api.proto [2], which nominally
handle construction time, and execution time concerns respectively) refer
to the design doc at
https://s.apache.org/beam-fn-state-api-and-bundle-processing [3] when it
comes to details around side inputs and the state API. I'm going to cover
side inputs exclusively, from User side, to construction, to execution.

Side Inputs are window scoped data outside of the parallel input that can
be supplied through the runner. From the user side, this allows a pipeline
to use any PCollection as additional (side) input to a DoFn, while also
handling whether that side input is in the same window as the parallel
input. It's better described in the programming guide section on Side
Inputs [4], so I won't repeat their utility here. On the execution side,
the data gets to the SDK side worker using the State Channel and API
(described in [3])

At pipeline construction time, an SDK will convert it's own internal
constructs into the Beam Pipeline protos, and in the Go SDK, that happens
in graphx/translate.go. The SDK translates DoFns into ParDoPayloads,
populating a map of SideInput protos [5][6]. But before any of that, the
user needs to be able to specify side inputs are to be used at all. So how
does a user do that in Go?

*# Part 1: Decomposition*

In the Go SDK, because at time of design, Generics didn't exist (not till
Go 1.18 at soonest), we heavily use the original generic concept: Functions
with typed parameters. If you squint, funcs can be viewed as generic
(apologies to the PL purists in the crowd). See the SDKs RFC [7] for more.
Throughout this explanation I'll be using loose Go syntax to represent some
concepts. This is not real Go code, and these symbols are arbitrary types,
rather than concrete types. After the User hands us a DoFn, we decompose
it's lifecycle methods (like ProcessElement) to see if it's well formed,
and what inputs it accepts and outputs it emits.

 So a DoFn with it's ProcessElement  method can specify a wide variety of
inputs. The designers ended up decreeing that we'll just specify an order
to the parameters (instead of explicit tags, like Java does), and that will
determine a parameter's role in a ParDo's graph representation. Roughly, as
a pseudo-regular expression, for a ProcessElement it's defined as follows :

func(FnValue, SideInput*, FnEmit*)

Where FnValue means any user element in a PCollection, SideInput can be one
of FnValue, FnIter, FnReIter, and FnEmit represents outputs from this DoFn.
FnIter, and FnReIter represent Beam Iterables, used either for SideInputs,
or for GBK values (mentioned in my last email). This is a dramatic
simplification of the more complete expression for any Lifecycle method
(seen at [8]) which has a more complete ordering. Order is important, as we
don't have much else to go on, with the first FnValue is always the
parallel input. Those Fn* terms are also defined in that same file. This is
how the SDK understands functions and parameters, though decomposition.

In the Go SDK, Side Inputs can be an iterable (represented by a `func(*V)
bool` type), a ReIterable (represented by a `func() func(*V) bool` type),
or a Singleton (represented by the type, V). We use functions to handle
iterables, because of the lack of generics. Similarly, we can and should
handle functional representations of Map access to that data (the subject
of this current discussion, BEAM-3293
<https://issues.apache.org/jira/browse/BEAM-3293>).

To see what a given user type parameter represents, we have helper
functions [9] to make that determination, called in a state machine [10].
To handle map functions for BEAM-3293, new functions, representation kinds,
and cases would need to be added there.

A wrinkle in this is that Side inputs can be a singleton value (a
PCollection with 1 element), so the SDK can't statically know what a DoFn's
parameters mean without the context of it's input PCollections.  Depending
on the input PCollection, a func(K,V) that is decomposed to func(FnValue,
FnValue) could either be a DoFn that processes a PCollection<K> with a
singleton side input PCollection<V>, or a DoFn that processes a
PCollection<KV<K,V>>. For that we need to bind the inputs to the DoFn
parameters.

*# Part 2: Binding*

Now that the SDK understands the shape of the user DoFn, it needs to
understand how it connects to the graph. This is done through binding the
input PCollections to the DoFns parameters.

Pertinent to this discussion is [11] where side inputs kinds are bound to
specific types. This would need to handle and produce the new map kinds.
SideInput Maps are simpler, since we know they *must* be associated with a
PCollection<KV<K, V>>. If the Nth SideInput PCollection doesn't match with
the Nth side input parameter, then an error should occur at binding time.

Binding errors are tricky since the mistake is definitely at the specific
DoFn, but all we know is there's a mismatch between what the DoFn requires
(determined by the earlier Decomposition), and what was provided by
pipeline construction (via beam.SideInput options). But if the shapes
match, and the Go types match, we're golden, and can move onto proto
translation. This should mostly be handled, but worth looking into
specifically when adding a new side input kind.

*# Part 3: Translation to Protos*
Now that the user half of the SDK understands Map function parameter kinds
as side inputs, we need to translate them to the proto pipeline graph. Like
most translations to the beam protos, this happens in graphx/translate.go
at [6]. This is arguably the critical bit, as it defines how the runner is
supposed to serve values.

How are iterable side inputs currently handled? Reading down from [6], when
side inputs are present, the Go SDK will add an additional Transform that
converts the input PCollection<Foo> into a PCollection<KV<K, Foo>> where
the key is always the empty string (""), this is represented by the Go SDK
specific URNIterableSideInputKey URN. All such SDK specific URN are
prefixed by "beam:go" to avoid mixing them up with actual URNs defined by
the Beam protos. This URN will be detected at execution time. Most of that
case statement is to set up that additional Fixed Key Transform in the
graph.  The local side input key (i#), is mapped to pull it's values from
the keyed input (referring to the global key), along with including the
side input in the map with that same local key.

*Aside:* It appears the Go SDK presently ignores concepts like the ViewFn
and WindowMappingFn, one or both of which might allow the SDK to avoid
adding in the additional keying PTransform as a PTransform. This is
Technical Debt, as this is probably not how things are done in the portable
Python and Java SDKs. I'm uncertain how much this may need to change to
enable Map views, but I suspect not at all at this time.

The short version for random access side input maps is, we're saying "we're
passing a PCollection<KV<K,V>> and would like you to be able to spit the
values back out at us on demand". This means all were' putting into the Map
or MultiMap case at line 324 below (co-opted for the functional
interpretations, rather than as single map[K]V values) is the section
starting at line 312, populating  the side input map [12].

*# Part 4: Execution from Protos*

OK, so, at this point, the user has constructed a pipeline, and started a
job with a runner, which in turn spun up workers. Those workers include an
SDK side harness that live largely in the Go SDK exec and harness packages.
It connects to the FnAPI services on the runner side, and  (loosely) starts
to receive ProcessBundleDescriptors which describe subgraphs to process and
their associate data. Focusing still on side inputs..

There are two things to look into to understand what's going on on the
execution side (mostly handled in exec/translate.go [13]),that URN for
Iterable Side inputs, and preparing to read side inputs.

First is how we use that URNIterableSideInputKey urn. That's done at [14],
where all it does is create a special FixedKey execution node [15]. This is
a special node that turns PCollection<V> into PCollection<KV<K, V>>. This
includes if that original V is actually a KV itself, like KV<K2, V2>,
meaning the output would have nested KVs: PCollection<KV<K, KV<K2, V2>>>.
Regardless, in most cases, the next node would be a datasink, which then
encodes the values and outputs them to the runner. This is happening to the
PCollection being used as a side input.  In particular, all values are
essentially wrapped as the value of a KV, and associated with a fixed key,
in this case the empty string ("").

That key is important, since it's the key for the state APIs MultiMap side
input for when Iterables are read. If we didn't associate everything with a
single key, the Runner would allow random access with given key requests,
which is what we're implementing now. Fancy that :D.

On the side of the ParDo that's going to be reading the Side input, it
receives a variety of data that it can use to query the State API, and
process responses from it, such as the port to query against, id for this
set of state, the receiving transform requesting the data, and the window
coder and element coders. [16]
These are wrapped into a side input adapter [17] and passed to the ParDo
for actual execution [18]

*Aside:* How about that, a comment about how we aren't using view_fn and
window_mapping_fn... Likely critical for using side inputs with triggers
and non-Global windows... No matter for now.

*# Part 5: Reading the Side Input*

Elsewhere the harness has received the configuration to start executing the
plan. Data is about to flow through the Data channel, and provide our DoFn
with primary input elements. But what about side inputs?

Before each primary input element, we do some initialization to make sure
that we have the right data to pass along along with it. Notionally this is
a bit of light caching to ensure that the element we're executing shares a
window with the cached data, but if it's not yet initialized or it's not,
then we need to rebuild it from scratch. This is in ParDo.initSideInput
[19].  Each side input configuration was wrapped into an adapter that knows
how to produce iterables or whatever else it needs, which is happening
here. This will most likely need to change to handle maps as well, as this
was built with the assumption of only having iterables. This work changes
this assumption.

The Go SDK abstracts streams of data from wherever they're coming from with
ReStreams and Streams [20], and ReusableInputs [21]. The streams are how
decoded data is provided or reset, while the ReusableInputs are for
rewinding those without necessarily making new runner requests.

This likely won't need to change entirely with maps, but at present, we
haven't made it simple to provide a new key for querying.

How do we use user side keys for querying though? The SideInputAdapter
NewIterable demonstrates how: We take the key, encode it, and the window,
and request the element stream from the StateReader [22]. The StateReader
abstracts actually calling the State service for us and its OpenSideInput
method should continue to serve that purpose for us. It is implemented in
harness/statemgr.go [23]. The StateReader is also where most of the
protocol in the state API doc [3] is implemented.

With a simple iterable (say func(*V) bool), the user passes an allocated v
pointer to the iterable, and then something happens, and if it's
successful, the function returns true, indicating the value is ready for
use. Same if it were a func(*K,*V) bool. These functions are generated via
reflection in exec/inputs.go [24] or code generation before compile time.
We use this approach because generics do not exist, so we had no
alternative. Until they do (next year sometime), map side inputs will need
to do the same. We need reflection or code generation so we can have the
actual type used by the user back in part 1, otherwise invoking the DoFn
will fail at runtime. See the Design RFC [7] for the rationale.

Lets say we're using a random access function, with function types like
func(K) func(*V) bool, we need a struct to handle it's own state (the
references to streams and such) and have a method that can pretend to be
that type. For iterators that type is iterValue [25], and it's invoke
method [26].The invoke method matches the signature for the
reflect.MakeFunc method, which can then produce a value with the right
function type. You can see makeIter [27] for how that call happens.

So say we have a mapValue type, with an invoke method, it would need to
take in it's key argument, and pass that to some ReStream object. ReStream
is an interface type, so we can contrive things so that we know it's
concrete type, and type assert it to it (say v.s.(mapReStream)), at which
point we pass it the key. Then we receive the stream of values as normal,
which can then be handled by an instance of iter.

Note, I've changed the user type to `func(K) func(*V) bool` because at this
point it can and could be a multimap, which would require it's own
iteration. We can always add additional variations if we like, but it
certainly complicates part 1 and part 5. At this point, once we can iterate
through multimaps directly, I'd be willing to wait for generics where most
of the complexity can be simplified into generic parent structs (like
beam.KV, or beam.Iterator, or beam.ReIterator or beam.MultiMap...).

This is far from fully complete, but it this should be able to get you
started I think?

Cheers,
Robert Burke
Who can't quite write these emails in his sleep, but did know everywhere to
look in the Go SDK.

[1]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto
[2]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/fn-execution/src/main/proto/beam_fn_api.proto
[3] https://s.apache.org/beam-fn-state-api-and-bundle-processing
[4] https://beam.apache.org/documentation/programming-guide/#side-inputs
[5]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L1186
[6]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L278
[7] https://s.apache.org/beam-go-sdk-design-rfc
[8]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L379
[9] hhttps://
github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/sideinput.go#L32
[10]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L318
[11]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/graph/bind.go#L179

[12]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L312

[13]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go
[14]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L464
[15]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L108
[16]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402

[17]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L48
[18]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L38
[19]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L254
[20]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L57
[21]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L36
[22]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L59
[23]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
[24]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107
[25]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L98
[26]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L153
[27]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107


On Mon, Mar 29, 2021 at 5:38 PM Ahmet Altay <[email protected]> wrote:

> Adding some folks who might be able to help: @Robert Burke
> <[email protected]> @Kenneth Knowles <[email protected]> @Tyson Hamilton
> <[email protected]>
>
> On Mon, Mar 29, 2021 at 2:31 PM Miguel Anzo Palomo <
> [email protected]> wrote:
>
>> Hi,
>> I was checking out this task BEAM-3293
>> <https://issues.apache.org/jira/browse/BEAM-3293> and I'm having some
>> issues fully understanding the idea of how side inputs work internally. Is
>> there any resource or specific example that can help to better understand
>> how they work and why/where the lazy map implementation would help so I can
>> get a better grasp of the task?
>>
>> Thanks
>>
>> --
>>
>> Miguel Angel Anzo Palomo | WIZELINE
>>
>> Software Engineer
>>
>> [email protected]
>>
>> Remote Office
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>

Reply via email to