An external scheduler would also create a new job every time. The only way I see to continuously process results in a single job is to have a streaming job.
On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi <anjan...@bahwancybertek.com> wrote: > > Hi Juan, > > Thanks for the reply ! I want to know if there is any way in dataflow to > achieve this before trying external scheduler. > > Regards, > Anjana > ________________________________ > From: Juan Carlos Garcia [jcgarc...@gmail.com] > Sent: Monday, September 16, 2019 11:23 AM > To: user@beam.apache.org > Cc: Richard Amrith Lourdu > Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop) > > Hi Anjana, > > You need to separate your line of thoughts between the pipeline definition vs > what happens when you call *run* on the pipeline, given that you need > externalize the scheduling using something like a crontab, jenkins, or > another mechanism. > > Best regards, > JC > > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi <anjan...@bahwancybertek.com> > wrote: >> >> Hi, >> >> I am trying to run a task using an infinite while loop with change in input >> parameters as below but it creates a new job for every time. (I expect it to >> use same pipeline that is created before while loop and should be in a >> single job) >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> while True: >> to_date = time.time() >> >> (p >> | 'create surveys' >> beam.Create(id_list) >> | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >> | 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >> ) >> from_date = to_date + timedelta(microseconds=1) >> time.sleep(30) >> >> p.run().wait_until_finish() >> >> It works properly(in only one job) when there is no while loop as below: >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> (p >> | 'create surveys' >> beam.Create(id_list) >> | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >> | 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >> ) >> >> p.run().wait_until_finish() >> >> Could someone please suggest how to make the task run in same job instead of >> creating multiple jobs. >> >> Please let me know in case if any additional information needed. >> >> Thanks, >> Anjana >> >> >> ----------------------------------------------------------------------------------------------------------------------- >> The information contained in this communication is intended solely for the >> use of the individual or entity to whom it is addressed and others >> authorized to receive it. It may contain confidential or legally privileged >> information. If you are not the intended recipient you are hereby notified >> that any disclosure, copying, distribution or taking any action in reliance >> on the contents of this information is strictly prohibited and may be >> unlawful. If you are not the intended recipient, please notify us >> immediately by responding to this email and then delete it from your system. >> Bahwan Cybertek is neither liable for the proper and complete transmission >> of the information contained in this communication nor for any delay in its >> receipt. > > > > -- > > JC > > ----------------------------------------------------------------------------------------------------------------------- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others authorized > to receive it. It may contain confidential or legally privileged information. > If you are not the intended recipient you are hereby notified that any > disclosure, copying, distribution or taking any action in reliance on the > contents of this information is strictly prohibited and may be unlawful. If > you are not the intended recipient, please notify us immediately by > responding to this email and then delete it from your system. Bahwan Cybertek > is neither liable for the proper and complete transmission of the information > contained in this communication nor for any delay in its receipt.