Hi all,
Sorry about the wall of text.
So, first of all, I thought about this while reviewing a PR by Boyuan with
an example of an SDF[1]. This is very exciting btw : ).

Anyway... I certainly have a limited view of the whole SDF effort, but I
think it's worth discussing this particular point about the API before
finalizing SDF and making it widely available. So here I go:

The Python API for SDF asks users to provide a restriction provider in
their process function signature. More or less the following:

class MyOwnLittleSDF(beam.DoFn):
  def process(self, element,
              restriction_tracker=MyOwnLittleRestrictionProvider()):
    # My DoFn logic...

This is all fine, but something that I found a little odd is that the
restriction provider gets replaced at runtime with a restriction tracker:

class MyOwnLittleSDF(beam.DoFn):
  def process(self, element,
              restriction_tracker=MyOwnLittleRestrictionProvider()):
    # This assert succeeds : )
    assert not isinstance(restriction_tracker,
                          MyOwnLittleRestrictionProvider)

After thinking a little bit about it, I realized that the default argument
simply allows us to inform the runner where to find the restriction
provider; but that the thing that we need at runtime is NOT the restriction
provider - but rather, the restriction tracker.

A similar pattern occurs with state and timers, where the runner needs to
know the sort of state, the coder for the values in that state (or the time
domain for timers); but the runtime parameter is different[2]. For state
and timers (and window, timestamp, pane, etc.) we provide a pattern where
users give a default value that is clearly a placeholder:
beam.DoFn.TimerParam, or beam.DoFn.StateParam.

In this case, the API is fairly similar, but (at least in my imagination),
it is much more clear about how the DoFnParam will be replaced with
something else at runtime. A similar change could be done for SDF:

class MyOwnLittleSDF(beam.DoFn):
  MY_RESTRICTION = \
      RestrictionSpec(provider=MyOwnLittleRestrictionProvider())

  def process(
      self, element,
      restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
    # My DoFn logic..

Perhaps it is a good opportunity to consider this, since SDF is still in
progress.

Some pros:
- Consistent with other parameters that we pass to DoFn methods
- A bit more clear about what will happen at runtime

Some cons:
- SDF developers are "power users", and will have gone through the SDF
documentation. This point will be clear to them.
- This may create unnecessary work, and perhaps unintended consequences.
- I bet there's more

Thoughts?

-P.

[1] https://github.com/apache/beam/pull/8338
[2]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
 .

Reply via email to