Hey Beam devs,

So several months ago I posted my Go SDF proposal and got a lot of good
feedback (thread
<https://lists.apache.org/thread.html/327bc72a0b30e18c6152b562bac2613c0edc942465d67b215830819e%40%3Cdev.beam.apache.org%3E>,
doc <https://s.apache.org/beam-go-sdf>). Since then I've been working on
implementing it and I've got an initial prototype ready to show off! It
works with initial splitting on Flink, and has a decently documented API.
Also in the second part of the email I'll also be proposing changes to the
original doc, based on my experience working on this prototype.

To be clear, this is *not* ready to officially go into Beam yet; the API is
still likely to go through changes. Rather, I'm showing this off to show
that progress is being made on SDFs, and to provide some context to the
changes I'll be proposing below.

Here's a link to the repo and branch so you can download it, and a link to
the changes specifically:
Repo: https://github.com/youngoli/beam/tree/gosdf
Changes:
https://github.com/apache/beam/commit/28140ee3471d6cb80e74a16e6fd108cc380d4831

If you give it a try and have any thoughts, please let me know! I'm open to
any and all feedback.

==================================

Proposed Changes
Doc: https://s.apache.org/beam-go-sdf (Select "Version 1" from version
history.)

For anyone reading this who hasn't already read the doc above, I suggest
reading it first, since I'll be referring to concepts from it.

After working on the prototype I've changed my mind on the original
decisions to go with an interface approach and a combined restriction +
tracker. But I don't want to go all in and create another doc with a
detailed proposal, so I've laid out a brief summary of the changes to get
some initial feedback before I go ahead and start working on these changes
in detail. Please let me know what you think!

*1. Change from native Go interfaces to dynamic reflection-based API.*

Instead of the native Go interfaces (SplittableDoFn, RProvider, and
RTracker) described in the doc and implemented in the prototype, use the
same dynamic approach that the Go SDK already uses for DoFns: Use the
reflection system to examine the names and signatures of methods in the
user's DoFn, RProvider, and RTracker.

Original approach reasoning:

   - Simpler, so faster to implement and less bug-prone.
   - The extra burden on the user to keep types consistent is ok since most
   users of SDFs are more advanced

Change reasoning:

   - In the prototype, I found interfaces to require too much extra
   boilerplate which added more complexity than expected. (Examples: Constant
   casting,
   - More consistent API: Inconsistency between regular DoFns (dynamic) and
   SDF API (interfaces) was jarring and unintuitive when implementing SDFs as
   a user.

Implementation: Full details are up for discussion, but the goal is to make
the RProvider and  RTracker interfaces dynamic, so we can replace all
instances of interface{} in the methods with the actual element types (i.e.
fake generics). Also uses of the RProvider and RTracker interfaces in
signatures can be replaced with the implementations of those
providers/trackers. This will require a good amount of additional work in
the DoFn validation codebase and the code generator. Plus a fair amount of
additional user code validation will be needed and more testing since the
new code is more complex.

*2. Seperate the restriction tracker and restriction.*

Currently the API has the restriction combined with the tracker. In most
other SDKs and within the SDF model, the two are usually separate concepts,
and this change is to follow that approach and split the two.

Original approach reasoning:

   - It was considered simpler to avoid another level of type casting in
   the API with the interface approach.

Change reasoning:

   - We are no longer going with the interface approach. With "fake
   generics", it is simpler to keep the two concepts separate.
   - Requiring users to specify custom coders in order to only encode the
   restriction and not the tracker ended up adding additional complexity
   anyway.

Implementation: In the API have the restriction tracker initialized with a
restriction object accessible via a getter. The restriction itself will be
the only thing serialized, so it will be wrapped and unwrapped with the
tracker before the user code is invoked. This wouldn't add very little work
as it would mostly be bundled with the interface->dynamic approach change.


Thanks,
Daniel Oliveira

Reply via email to