I made the modification to fuse topics and subscriptions, I now think it may be cleaner. Attached a file with both versions.
Let me know what you think :D Thanks On Wed, Sep 9, 2020 at 5:35 PM Iñigo San Jose <josein...@google.com> wrote: > > Hi everyone! > > I have seen a very common use case in Beam which is pipelines that read > from multiple PubSub topics or multiple subscriptions to end up flattening > them. In general, this makes the pipeline harder to understand since not > much context can be taken from it. > > I was thinking of adding a PTransform that reads from a list of > Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub > Transform. This approach would help both developing and debugging the > pipelines: > > - No time spent developing a multi reader > - Easier organization of Topics/Subcriptions > - Pipeline graph easier to the eye and less convoluted > - Faster debugging > - Avoid issues when pipelines are too wide and hide other parts of the > pipeline > > As mentioned, I only have the PTransform based on an Extend, but in the > future implementing this with Splittable DoFn would be the way to go. The > PTransform takes 3 optional parameters: > > - topics: list of topics > - subscriptions: list of subscriptions > - with_context: boolean. If True it adds the topic/subscription name > to the message, so it becomes a tuple of (topic/subs name, message). This > could be helpful for future aggregations. The name may need to change. > > The parameters `topics` and `subscriptions` may be fused in a single > parameter and use the path [1] to know if it's a topic or a subscription. > But I consider it cleaner this way. > > Please find attached the current class I made as well as some screenshots > of how the pipeline looks. > > Since I don't know much about SplittableDoFns yet, I was considering > making a Pull Request for this PTransform and, on the meanwhile, work on a > SplittableDoFn version. > > Thanks a lot for your time, let me know what you think > Iñigo > > [1] projects/<PROJECT>/subscriptions/<NAME> > projects/<PROJECT>/topics/<NAME> > -- *•** Iñigo San Jose | josein...@google.coom* *•** Big Data *Technical Solutions Engineer *• *Google Cloud Platform • *Google, Dublin*
from apache_beam.io.gcp.pubsub import ReadFromPubSub from apache_beam import Flatten, Map, PTransform class PubSubMultipleReader(PTransform): def __init__(self, topics=[], subscriptions=[], with_context=False): self.topics = topics self.subscriptions = subscriptions self.with_context = with_context def expand(self, pcol): topics_subscriptions_pcol = [] for topic in self.topics: topic_split = topic.split('/') topic_project = topic_split[1] topic_name = topic_split[-1] current_topic = ( pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub( topic=topic) ) if self.with_context: name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}' current_topic = current_topic | name >> Map( lambda x: (topic, x)) topics_subscriptions_pcol.append(current_topic) for subscription in self.subscriptions: subscription_split = subscription.split('/') subscription_project = subscription_split[1] subscription_name = subscription_split[-1] current_subscription = ( pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub( subscription=subscription) ) if self.with_context: name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}" current_subscription = current_subscription | name >> Map( lambda x: (subscription, x)) topics_subscriptions_pcol.append(current_subscription) return tuple(topics_subscriptions_pcol) | Flatten() class PubSubMultipleReaderV2(PTransform): def __init__(self, source_list=[], with_context=False): self.source_list = source_list self.with_context = with_context def expand(self, pcol): sources_pcol = [] for source in self.source_list: source_split = source.split('/') source_project = source_split[1] source_type = source_split[2] source_name = source_split[-1] step_name_base = f'PubSub {source_type}/project:{source_project}' if source_type == 'topics': current_source = ( pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub( topic=source) ) if source_type == 'subscriptions': current_source = ( pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub( subscription=source) ) else: raise ValueError( 'Not a valid topic or subscription path: %s' % source) if self.with_context: current_source = current_source | f'{step_name_base}/Add Keys {source_name}' >> Map( lambda x: (source, x)) sources_pcol.append(current_source) return tuple(sources_pcol) | Flatten()