On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi
<[email protected]> wrote:
>
> Hi ,
>
> I have a beam streaming pipeline which reads data from pubsub topic, use it
> to call an API and get responses, apply some transformations on the obtained
> responses and writes to output sinks.
>
> Now, I need to add logic to write a 'process completed' message to another
> pubsub topic once after the process gets finished. Can some one please
> provide your thoughts on how can I add it.
>
> I actually want to achieve this:
>
> topic1 (data)-> triggers pipeline and writes complete message at end-> topic2
> (complete msg)
Your code below looks fine so far. I'm assuming your send_to_output
function produces the message that you want to send to topic2, right?
(BTW, you can write beam.Map(send_to_topic) rather than having to
write beam.Map(lambda output: send_to_output(output)).)
In that case, you just need to add
data | beam.io.WriteStringsToPubSub(topic2)
> When topic1 sees message on topic2, it again posts new data to topic1
>
> Below is the code sample:
>
> pubsub_message = p | 'Read From Pubsub' >>
> beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add'
> >> beam.ParDo(split_item)
>
> data = pubsub_message | 'API call' >> beam.FlatMap(lambda x:
> get_responses(x[0],
> datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
> datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) |
> 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))
>
> Thanks,
> Anjana
>
>
> -----------------------------------------------------------------------------------------------------------------------
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others authorized
> to receive it. It may contain confidential or legally privileged information.
> If you are not the intended recipient you are hereby notified that any
> disclosure, copying, distribution or taking any action in reliance on the
> contents of this information is strictly prohibited and may be unlawful. If
> you are not the intended recipient, please notify us immediately by
> responding to this email and then delete it from your system. Bahwan Cybertek
> is neither liable for the proper and complete transmission of the information
> contained in this communication nor for any delay in its receipt.