Thanks Ankur for the feedback.

Does anyone have any examples they could share writing a SDF for an unbound
source (ideally in python)?

The SDF I wrote looks something like this. It works fine using the
DirectRunner with the PipelineOption --direct_runner_mode='in_memory', but
doesn’t seem to operate properly using
--direct_runner_mode='multi_processing'. I’d like to run this in the
DataflowRunner, but I wanted to make sure the SDF was operating correctly
first.

Any tips would be greatly appreciated!

import apache_beam as beamfrom apache_beam.io.restriction_trackers
import OffsetRange, OffsetRestrictionTrackerfrom
apache_beam.transforms.core import RestrictionProvider
from myeventlibrary import Event, Subscriber
class InfRestrictionProvider(RestrictionProvider):
    """
    An infinite restriction provider
    """

    def initial_restriction(self, element):
        return OffsetRange(0, float('inf'))

    def create_tracker(self, restriction):
        return OffsetRestrictionTracker(restriction)

    def restriction_size(self, element, restriction):
        return 1
@beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn):
    """
    An SDF for subscribing to custom events.
    """

    restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider())

    def __init__(self):
        # type: () -> None
        super(_FtrackEventReadSDF, self).__init__()
        # number of events received
        self._counter = 0

    def process(self, _, restriction_tracker=restriction_tracker):
        # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]

        if not restriction_tracker.try_claim(self._counter):
            return

        subscriber = Subscriber()
        try:
            # Blocks until the next event is received.
            yield subscriber.get()
        finally:
            self._counter += 1
            restriction_tracker.defer_remainder()


On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka <goe...@google.com> wrote:

> Hi Sam,
>
> SDF can reject split requests so a SDF can be made to run a single
> instance.
> DoFn.unbounded_per_element Let the Beam model know that this is an
> unbounded source. It also tries to infer it
> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695
>
> As this will eventually run on a cluster, I would recommend going via the
> SDF route so that Watermarks and checkpoints can happen appropriately not
> just for this tranform but down stream tranforms.
>
> On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne <samb...@gmail.com> wrote:
>
>> Hello! I’m looking for some help writing a custom transform that reads
>> from an unbound source. A basic version of this would look something like
>> this:
>>
>> import apache_beam as beam
>> import myeventlibrary
>> class _ReadEventsFn(beam.DoFn):
>>   def process(self, unused_element):
>>     subscriber = myeventlibrary.Subscriber()
>>     while True:
>>       # blocks until an event is received
>>       event = subscriber.get()
>>       yield event
>>
>> with beam.Pipeline() as pipe:
>>   (
>>     pipe
>>     | beam.Impulse()
>>     | beam.ParDo(_ReadEventsFn())
>>     | beam.Map(print)
>>   )
>>
>> I have a few questions:
>>
>>    - When executing this in the DirectRunner with the default number of
>>    workers (1), is it correct to assume the process would be constantly
>>    blocking and rarely processing other parts of the pipeline?
>>    - I noticed there is a decorator DoFn.unbounded_per_element which
>>    seems useful, but it’s a bit unclear what it does. I also noticed in the
>>    java docs it is an error to apply this decorator if the DoFn is not a SDF.
>>    - I took a stab at writing this as a SDF. The incoming events are not
>>    really something that can be read in parallel and they trickle in slowly.
>>    In this scenario is there anything to gain by using a SDF?
>>
>> SDF can reject the split request so an SDF can be made to run as a single
> instance.
>
>>
>>
>> Thanks!
>> -Sam
>>
>

Reply via email to