I made the modification to fuse topics and subscriptions, I now think it
may be cleaner. Attached a file with both versions.
Let me know what you think :D
Thanks
On Wed, Sep 9, 2020 at 5:35 PM Iñigo San Jose <[email protected]> wrote:
>
> Hi everyone!
>
> I have seen a very common use case in Beam which is pipelines that read
> from multiple PubSub topics or multiple subscriptions to end up flattening
> them. In general, this makes the pipeline harder to understand since not
> much context can be taken from it.
>
> I was thinking of adding a PTransform that reads from a list of
> Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub
> Transform. This approach would help both developing and debugging the
> pipelines:
>
> - No time spent developing a multi reader
> - Easier organization of Topics/Subcriptions
> - Pipeline graph easier to the eye and less convoluted
> - Faster debugging
> - Avoid issues when pipelines are too wide and hide other parts of the
> pipeline
>
> As mentioned, I only have the PTransform based on an Extend, but in the
> future implementing this with Splittable DoFn would be the way to go. The
> PTransform takes 3 optional parameters:
>
> - topics: list of topics
> - subscriptions: list of subscriptions
> - with_context: boolean. If True it adds the topic/subscription name
> to the message, so it becomes a tuple of (topic/subs name, message). This
> could be helpful for future aggregations. The name may need to change.
>
> The parameters `topics` and `subscriptions` may be fused in a single
> parameter and use the path [1] to know if it's a topic or a subscription.
> But I consider it cleaner this way.
>
> Please find attached the current class I made as well as some screenshots
> of how the pipeline looks.
>
> Since I don't know much about SplittableDoFns yet, I was considering
> making a Pull Request for this PTransform and, on the meanwhile, work on a
> SplittableDoFn version.
>
> Thanks a lot for your time, let me know what you think
> Iñigo
>
> [1] projects/<PROJECT>/subscriptions/<NAME>
> projects/<PROJECT>/topics/<NAME>
>
--
*•** Iñigo San Jose | [email protected]*
*•** Big Data *Technical Solutions Engineer
*• *Google Cloud Platform
• *Google, Dublin*
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam import Flatten, Map, PTransform
class PubSubMultipleReader(PTransform):
def __init__(self, topics=[], subscriptions=[], with_context=False):
self.topics = topics
self.subscriptions = subscriptions
self.with_context = with_context
def expand(self, pcol):
topics_subscriptions_pcol = []
for topic in self.topics:
topic_split = topic.split('/')
topic_project = topic_split[1]
topic_name = topic_split[-1]
current_topic = (
pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub(
topic=topic)
)
if self.with_context:
name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}'
current_topic = current_topic | name >> Map(
lambda x: (topic, x))
topics_subscriptions_pcol.append(current_topic)
for subscription in self.subscriptions:
subscription_split = subscription.split('/')
subscription_project = subscription_split[1]
subscription_name = subscription_split[-1]
current_subscription = (
pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub(
subscription=subscription)
)
if self.with_context:
name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}"
current_subscription = current_subscription | name >> Map(
lambda x: (subscription, x))
topics_subscriptions_pcol.append(current_subscription)
return tuple(topics_subscriptions_pcol) | Flatten()
class PubSubMultipleReaderV2(PTransform):
def __init__(self, source_list=[], with_context=False):
self.source_list = source_list
self.with_context = with_context
def expand(self, pcol):
sources_pcol = []
for source in self.source_list:
source_split = source.split('/')
source_project = source_split[1]
source_type = source_split[2]
source_name = source_split[-1]
step_name_base = f'PubSub {source_type}/project:{source_project}'
if source_type == 'topics':
current_source = (
pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub(
topic=source)
)
if source_type == 'subscriptions':
current_source = (
pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub(
subscription=source)
)
else:
raise ValueError(
'Not a valid topic or subscription path: %s' % source)
if self.with_context:
current_source = current_source | f'{step_name_base}/Add Keys {source_name}' >> Map(
lambda x: (source, x))
sources_pcol.append(current_source)
return tuple(sources_pcol) | Flatten()