Thanks Ankur for the feedback. Does anyone have any examples they could share writing a SDF for an unbound source (ideally in python)?
The SDF I wrote looks something like this. It works fine using the DirectRunner with the PipelineOption --direct_runner_mode='in_memory', but doesn’t seem to operate properly using --direct_runner_mode='multi_processing'. I’d like to run this in the DataflowRunner, but I wanted to make sure the SDF was operating correctly first. Any tips would be greatly appreciated! import apache_beam as beamfrom apache_beam.io.restriction_trackers import OffsetRange, OffsetRestrictionTrackerfrom apache_beam.transforms.core import RestrictionProvider from myeventlibrary import Event, Subscriber class InfRestrictionProvider(RestrictionProvider): """ An infinite restriction provider """ def initial_restriction(self, element): return OffsetRange(0, float('inf')) def create_tracker(self, restriction): return OffsetRestrictionTracker(restriction) def restriction_size(self, element, restriction): return 1 @beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn): """ An SDF for subscribing to custom events. """ restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider()) def __init__(self): # type: () -> None super(_FtrackEventReadSDF, self).__init__() # number of events received self._counter = 0 def process(self, _, restriction_tracker=restriction_tracker): # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event] if not restriction_tracker.try_claim(self._counter): return subscriber = Subscriber() try: # Blocks until the next event is received. yield subscriber.get() finally: self._counter += 1 restriction_tracker.defer_remainder() On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka <goe...@google.com> wrote: > Hi Sam, > > SDF can reject split requests so a SDF can be made to run a single > instance. > DoFn.unbounded_per_element Let the Beam model know that this is an > unbounded source. It also tries to infer it > https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695 > > As this will eventually run on a cluster, I would recommend going via the > SDF route so that Watermarks and checkpoints can happen appropriately not > just for this tranform but down stream tranforms. > > On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne <samb...@gmail.com> wrote: > >> Hello! I’m looking for some help writing a custom transform that reads >> from an unbound source. A basic version of this would look something like >> this: >> >> import apache_beam as beam >> import myeventlibrary >> class _ReadEventsFn(beam.DoFn): >> def process(self, unused_element): >> subscriber = myeventlibrary.Subscriber() >> while True: >> # blocks until an event is received >> event = subscriber.get() >> yield event >> >> with beam.Pipeline() as pipe: >> ( >> pipe >> | beam.Impulse() >> | beam.ParDo(_ReadEventsFn()) >> | beam.Map(print) >> ) >> >> I have a few questions: >> >> - When executing this in the DirectRunner with the default number of >> workers (1), is it correct to assume the process would be constantly >> blocking and rarely processing other parts of the pipeline? >> - I noticed there is a decorator DoFn.unbounded_per_element which >> seems useful, but it’s a bit unclear what it does. I also noticed in the >> java docs it is an error to apply this decorator if the DoFn is not a SDF. >> - I took a stab at writing this as a SDF. The incoming events are not >> really something that can be read in parallel and they trickle in slowly. >> In this scenario is there anything to gain by using a SDF? >> >> SDF can reject the split request so an SDF can be made to run as a single > instance. > >> >> >> Thanks! >> -Sam >> >