If you want a single pipeline, as Robert mentioned you need an Streaming
pipeline which requires an unbounded source (like kafka, or gcp pub/sub).
In your example your are creating your source from a fixed list which on
definition is a bounded source.

JC

Robert Bradshaw <rober...@google.com> schrieb am Mo., 16. Sep. 2019, 21:38:

> An external scheduler would also create a new job every time. The only
> way I see to continuously process results in a single job is to have a
> streaming job.
>
> On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi
> <anjan...@bahwancybertek.com> wrote:
> >
> > Hi Juan,
> >
> > Thanks for the reply ! I want to know if there is any way in dataflow to
> achieve this before trying external scheduler.
> >
> > Regards,
> > Anjana
> > ________________________________
> > From: Juan Carlos Garcia [jcgarc...@gmail.com]
> > Sent: Monday, September 16, 2019 11:23 AM
> > To: user@beam.apache.org
> > Cc: Richard Amrith Lourdu
> > Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop)
> >
> > Hi Anjana,
> >
> > You need to separate your line of thoughts between the pipeline
> definition vs what happens when you call *run* on the pipeline, given that
> you need externalize the scheduling using something like a crontab,
> jenkins, or another mechanism.
> >
> > Best regards,
> > JC
> >
> > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi <anjan...@bahwancybertek.com>
> wrote:
> >>
> >> Hi,
> >>
> >> I am trying to run a task using an infinite while loop with change in
> input parameters as below but it creates a new job for every time. (I
> expect it to use same pipeline that is created before while loop and should
> be in a single job)
> >>
> >> p = beam.Pipeline(options=PipelineOptions())
> >>
> >> while True:
> >>     to_date = time.time()
> >>
> >>     (p
> >>         | 'create surveys' >> beam.Create(id_list)
> >>            | 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
> from_date, to_date))
> >>            | 'send to output' >> beam.FlatMap(lambda input:
> (send_to_api(input)))
> >>            )
> >>     from_date = to_date + timedelta(microseconds=1)
> >>     time.sleep(30)
> >>
> >> p.run().wait_until_finish()
> >>
> >> It works properly(in only one job) when there is no while loop as below:
> >>
> >> p = beam.Pipeline(options=PipelineOptions())
> >>
> >>     (p
> >>         | 'create surveys' >> beam.Create(id_list)
> >>            | 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
> from_date, to_date))
> >>            | 'send to output' >> beam.FlatMap(lambda input:
> (send_to_api(input)))
> >>            )
> >>
> >> p.run().wait_until_finish()
> >>
> >> Could someone please suggest how to make the task run in same job
> instead of creating multiple jobs.
> >>
> >> Please let me know in case if any additional information needed.
> >>
> >> Thanks,
> >> Anjana
> >>
> >>
> >>
> -----------------------------------------------------------------------------------------------------------------------
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
> >
> >
> >
> > --
> >
> > JC
> >
> >
> -----------------------------------------------------------------------------------------------------------------------
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
>

Reply via email to