I made the modification to fuse topics and subscriptions, I now think it
may be cleaner. Attached a file with both versions.

Let me know what you think :D

Thanks

On Wed, Sep 9, 2020 at 5:35 PM Iñigo San Jose <josein...@google.com> wrote:

>
> Hi everyone!
>
> I have seen a very common use case in Beam which is pipelines that read
> from multiple PubSub topics or multiple subscriptions to end up flattening
> them. In general, this makes the pipeline harder to understand since not
> much context can be taken from it.
>
> I was thinking of adding a PTransform that reads from a list of
> Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub
> Transform. This approach would help both developing and debugging the
> pipelines:
>
>    - No time spent developing a multi reader
>    - Easier organization of Topics/Subcriptions
>    - Pipeline graph easier to the eye and less convoluted
>    - Faster debugging
>    - Avoid issues when pipelines are too wide and hide other parts of the
>    pipeline
>
> As mentioned, I only have the PTransform based on an Extend, but in the
> future implementing this with Splittable DoFn would be the way to go. The
> PTransform takes 3 optional parameters:
>
>    - topics: list of topics
>    - subscriptions: list of subscriptions
>    - with_context: boolean. If True it adds the topic/subscription name
>    to the message, so it becomes a tuple of (topic/subs name, message). This
>    could be helpful for future aggregations. The name may need to change.
>
> The parameters `topics` and `subscriptions` may be fused in a single
> parameter and use the path [1] to know if it's a topic or a subscription.
> But I consider it cleaner this way.
>
> Please find attached the current class I made as well as some screenshots
> of how the pipeline looks.
>
> Since I don't know much about SplittableDoFns yet, I was considering
> making a Pull Request for this PTransform and, on the meanwhile, work on a
> SplittableDoFn version.
>
> Thanks a lot for your time, let me know what you think
> Iñigo
>
> [1] projects/<PROJECT>/subscriptions/<NAME>
> projects/<PROJECT>/topics/<NAME>
>


-- 

*•** Iñigo San Jose | josein...@google.coom*
*•** Big Data *Technical Solutions Engineer
*• *Google Cloud Platform
• *Google, Dublin*
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam import Flatten, Map, PTransform


class PubSubMultipleReader(PTransform):
    def __init__(self, topics=[], subscriptions=[], with_context=False):
        self.topics = topics
        self.subscriptions = subscriptions
        self.with_context = with_context

    def expand(self, pcol):
        topics_subscriptions_pcol = []
        for topic in self.topics:
            topic_split = topic.split('/')
            topic_project = topic_split[1]
            topic_name = topic_split[-1]
            current_topic = (
                pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub(
                topic=topic)
            )
            if self.with_context:
                name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}'
                current_topic = current_topic | name >> Map(
                    lambda x: (topic, x))

            topics_subscriptions_pcol.append(current_topic)

        for subscription in self.subscriptions:
            subscription_split = subscription.split('/')
            subscription_project = subscription_split[1]
            subscription_name = subscription_split[-1]
            current_subscription = (
                pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub(
                    subscription=subscription)
            )
            if self.with_context:
                name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}"
                current_subscription = current_subscription | name >> Map(
                    lambda x: (subscription, x))

            topics_subscriptions_pcol.append(current_subscription)

        return tuple(topics_subscriptions_pcol) | Flatten()


class PubSubMultipleReaderV2(PTransform):
    def __init__(self, source_list=[], with_context=False):
        self.source_list = source_list
        self.with_context = with_context

    def expand(self, pcol):
        sources_pcol = []
        for source in self.source_list:
            source_split = source.split('/')
            source_project = source_split[1]
            source_type = source_split[2]
            source_name = source_split[-1]

            step_name_base = f'PubSub {source_type}/project:{source_project}'

            if source_type == 'topics':
                current_source = (
                    pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub(
                        topic=source)
                )
            if source_type == 'subscriptions':
                current_source = (
                    pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub(
                        subscription=source)
                )
            else:
                raise ValueError(
                    'Not a valid topic or subscription path: %s' % source)

            if self.with_context:
                current_source = current_source | f'{step_name_base}/Add Keys {source_name}' >> Map(
                    lambda x: (source, x))

            sources_pcol.append(current_source)

        return tuple(sources_pcol) | Flatten()

Reply via email to