Re: Looping in Dataflow(Creating multiple jobs for a while loop)
If you want a single pipeline, as Robert mentioned you need an Streaming pipeline which requires an unbounded source (like kafka, or gcp pub/sub). In your example your are creating your source from a fixed list which on definition is a bounded source. JC Robert Bradshaw schrieb am Mo., 16. Sep. 2019, 21:38: > An external scheduler would also create a new job every time. The only > way I see to continuously process results in a single job is to have a > streaming job. > > On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi > wrote: > > > > Hi Juan, > > > > Thanks for the reply ! I want to know if there is any way in dataflow to > achieve this before trying external scheduler. > > > > Regards, > > Anjana > > > > From: Juan Carlos Garcia [jcgarc...@gmail.com] > > Sent: Monday, September 16, 2019 11:23 AM > > To: user@beam.apache.org > > Cc: Richard Amrith Lourdu > > Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop) > > > > Hi Anjana, > > > > You need to separate your line of thoughts between the pipeline > definition vs what happens when you call *run* on the pipeline, given that > you need externalize the scheduling using something like a crontab, > jenkins, or another mechanism. > > > > Best regards, > > JC > > > > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi > wrote: > >> > >> Hi, > >> > >> I am trying to run a task using an infinite while loop with change in > input parameters as below but it creates a new job for every time. (I > expect it to use same pipeline that is created before while loop and should > be in a single job) > >> > >> p = beam.Pipeline(options=PipelineOptions()) > >> > >> while True: > >> to_date = time.time() > >> > >> (p > >> | 'create surveys' >> beam.Create(id_list) > >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, > from_date, to_date)) > >>| 'send to output' >> beam.FlatMap(lambda input: > (send_to_api(input))) > >>) > >> from_date = to_date + timedelta(microseconds=1) > >> time.sleep(30) > >> > >> p.run().wait_until_finish() > >> > >> It works properly(in only one job) when there is no while loop as below: > >> > >> p = beam.Pipeline(options=PipelineOptions()) > >> > >> (p > >> | 'create surveys' >> beam.Create(id_list) > >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, > from_date, to_date)) > >>| 'send to output' >> beam.FlatMap(lambda input: > (send_to_api(input))) > >>) > >> > >> p.run().wait_until_finish() > >> > >> Could someone please suggest how to make the task run in same job > instead of creating multiple jobs. > >> > >> Please let me know in case if any additional information needed. > >> > >> Thanks, > >> Anjana > >> > >> > >> > --- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others > authorized to receive it. It may contain confidential or legally privileged > information. If you are not the intended recipient you are hereby notified > that any disclosure, copying, distribution or taking any action in reliance > on the contents of this information is strictly prohibited and may be > unlawful. If you are not the intended recipient, please notify us > immediately by responding to this email and then delete it from your > system. Bahwan Cybertek is neither liable for the proper and complete > transmission of the information contained in this communication nor for any > delay in its receipt. > > > > > > > > -- > > > > JC > > > > > --- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others > authorized to receive it. It may contain confidential or legally privileged > information. If you are not the intended recipient you are hereby notified > that any disclosure, copying, distribution or taking any action in reliance > on the contents of this information is strictly prohibited and may be > unlawful. If you are not the intended recipient, please notify us > immediately by responding to this email and then delete it from your > system. Bahwan Cybertek is neither liable for the proper and complete > transmission of the information contained in this communication nor for any > delay in its receipt. >
Re: Looping in Dataflow(Creating multiple jobs for a while loop)
An external scheduler would also create a new job every time. The only way I see to continuously process results in a single job is to have a streaming job. On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi wrote: > > Hi Juan, > > Thanks for the reply ! I want to know if there is any way in dataflow to > achieve this before trying external scheduler. > > Regards, > Anjana > > From: Juan Carlos Garcia [jcgarc...@gmail.com] > Sent: Monday, September 16, 2019 11:23 AM > To: user@beam.apache.org > Cc: Richard Amrith Lourdu > Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop) > > Hi Anjana, > > You need to separate your line of thoughts between the pipeline definition vs > what happens when you call *run* on the pipeline, given that you need > externalize the scheduling using something like a crontab, jenkins, or > another mechanism. > > Best regards, > JC > > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi > wrote: >> >> Hi, >> >> I am trying to run a task using an infinite while loop with change in input >> parameters as below but it creates a new job for every time. (I expect it to >> use same pipeline that is created before while loop and should be in a >> single job) >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> while True: >> to_date = time.time() >> >> (p >> | 'create surveys' >> beam.Create(id_list) >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >>| 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >>) >> from_date = to_date + timedelta(microseconds=1) >> time.sleep(30) >> >> p.run().wait_until_finish() >> >> It works properly(in only one job) when there is no while loop as below: >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> (p >> | 'create surveys' >> beam.Create(id_list) >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >>| 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >>) >> >> p.run().wait_until_finish() >> >> Could someone please suggest how to make the task run in same job instead of >> creating multiple jobs. >> >> Please let me know in case if any additional information needed. >> >> Thanks, >> Anjana >> >> >> --- >> The information contained in this communication is intended solely for the >> use of the individual or entity to whom it is addressed and others >> authorized to receive it. It may contain confidential or legally privileged >> information. If you are not the intended recipient you are hereby notified >> that any disclosure, copying, distribution or taking any action in reliance >> on the contents of this information is strictly prohibited and may be >> unlawful. If you are not the intended recipient, please notify us >> immediately by responding to this email and then delete it from your system. >> Bahwan Cybertek is neither liable for the proper and complete transmission >> of the information contained in this communication nor for any delay in its >> receipt. > > > > -- > > JC > > --- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others authorized > to receive it. It may contain confidential or legally privileged information. > If you are not the intended recipient you are hereby notified that any > disclosure, copying, distribution or taking any action in reliance on the > contents of this information is strictly prohibited and may be unlawful. If > you are not the intended recipient, please notify us immediately by > responding to this email and then delete it from your system. Bahwan Cybertek > is neither liable for the proper and complete transmission of the information > contained in this communication nor for any delay in its receipt.
RE: Looping in Dataflow(Creating multiple jobs for a while loop)
Hi Juan, Thanks for the reply ! I want to know if there is any way in dataflow to achieve this before trying external scheduler. Regards, Anjana From: Juan Carlos Garcia [jcgarc...@gmail.com] Sent: Monday, September 16, 2019 11:23 AM To: user@beam.apache.org Cc: Richard Amrith Lourdu Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop) Hi Anjana, You need to separate your line of thoughts between the pipeline definition vs what happens when you call *run* on the pipeline, given that you need externalize the scheduling using something like a crontab, jenkins, or another mechanism. Best regards, JC On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi mailto:anjan...@bahwancybertek.com>> wrote: Hi, I am trying to run a task using an infinite while loop with change in input parameters as below but it creates a new job for every time. (I expect it to use same pipeline that is created before while loop and should be in a single job) p = beam.Pipeline(options=PipelineOptions()) while True: to_date = time.time() (p | 'create surveys' >> beam.Create(id_list) | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, to_date)) | 'send to output' >> beam.FlatMap(lambda input: (send_to_api(input))) ) from_date = to_date + timedelta(microseconds=1) time.sleep(30) p.run().wait_until_finish() It works properly(in only one job) when there is no while loop as below: p = beam.Pipeline(options=PipelineOptions()) (p | 'create surveys' >> beam.Create(id_list) | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, to_date)) | 'send to output' >> beam.FlatMap(lambda input: (send_to_api(input))) ) p.run().wait_until_finish() Could someone please suggest how to make the task run in same job instead of creating multiple jobs. Please let me know in case if any additional information needed. Thanks, Anjana --- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt. -- JC --- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
RE: Looping in Dataflow(Creating multiple jobs for a while loop)
Hi Robert, Thanks for reply ! Yes, I want a single pipeline instead of new job for every 30 seconds. In the approach you mentioned, it creates new job every time. Regards, Anjana From: Robert Bradshaw [rober...@google.com] Sent: Monday, September 16, 2019 11:51 AM To: user Cc: Richard Amrith Lourdu Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop) You can put your pipeline definition inside your while loop, i.e. while True: to_date = time.time() p = beam.Pipeline(options=PipelineOptions()) (p | 'create surveys' >> beam.Create(id_list) | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, to_date)) | 'send to output' >> beam.FlatMap(lambda input: (send_to_api(input))) ) from_date = to_date + timedelta(microseconds=1) time.sleep(30) p.run().wait_until_finish() which will run pipelines until your script terminates, but this will fire off a new batch job every 30+ seconds. It sounds like what you'd really want to do here is have a single streaming pipeline. On Mon, Sep 16, 2019 at 11:24 AM Juan Carlos Garcia wrote: > > Hi Anjana, > > You need to separate your line of thoughts between the pipeline definition vs > what happens when you call *run* on the pipeline, given that you need > externalize the scheduling using something like a crontab, jenkins, or > another mechanism. > > Best regards, > JC > > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi > wrote: >> >> Hi, >> >> I am trying to run a task using an infinite while loop with change in input >> parameters as below but it creates a new job for every time. (I expect it to >> use same pipeline that is created before while loop and should be in a >> single job) >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> while True: >> to_date = time.time() >> >> (p >> | 'create surveys' >> beam.Create(id_list) >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >>| 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >>) >> from_date = to_date + timedelta(microseconds=1) >> time.sleep(30) >> >> p.run().wait_until_finish() >> >> It works properly(in only one job) when there is no while loop as below: >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> (p >> | 'create surveys' >> beam.Create(id_list) >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >>| 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >>) >> >> p.run().wait_until_finish() >> >> Could someone please suggest how to make the task run in same job instead of >> creating multiple jobs. >> >> Please let me know in case if any additional information needed. >> >> Thanks, >> Anjana >> >> >> --- >> The information contained in this communication is intended solely for the >> use of the individual or entity to whom it is addressed and others >> authorized to receive it. It may contain confidential or legally privileged >> information. If you are not the intended recipient you are hereby notified >> that any disclosure, copying, distribution or taking any action in reliance >> on the contents of this information is strictly prohibited and may be >> unlawful. If you are not the intended recipient, please notify us >> immediately by responding to this email and then delete it from your system. >> Bahwan Cybertek is neither liable for the proper and complete transmission >> of the information contained in this communication nor for any delay in its >> receipt. > > > > -- > > JC > --- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
Re: Looping in Dataflow(Creating multiple jobs for a while loop)
You can put your pipeline definition inside your while loop, i.e. while True: to_date = time.time() p = beam.Pipeline(options=PipelineOptions()) (p | 'create surveys' >> beam.Create(id_list) | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, to_date)) | 'send to output' >> beam.FlatMap(lambda input: (send_to_api(input))) ) from_date = to_date + timedelta(microseconds=1) time.sleep(30) p.run().wait_until_finish() which will run pipelines until your script terminates, but this will fire off a new batch job every 30+ seconds. It sounds like what you'd really want to do here is have a single streaming pipeline. On Mon, Sep 16, 2019 at 11:24 AM Juan Carlos Garcia wrote: > > Hi Anjana, > > You need to separate your line of thoughts between the pipeline definition vs > what happens when you call *run* on the pipeline, given that you need > externalize the scheduling using something like a crontab, jenkins, or > another mechanism. > > Best regards, > JC > > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi > wrote: >> >> Hi, >> >> I am trying to run a task using an infinite while loop with change in input >> parameters as below but it creates a new job for every time. (I expect it to >> use same pipeline that is created before while loop and should be in a >> single job) >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> while True: >> to_date = time.time() >> >> (p >> | 'create surveys' >> beam.Create(id_list) >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >>| 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >>) >> from_date = to_date + timedelta(microseconds=1) >> time.sleep(30) >> >> p.run().wait_until_finish() >> >> It works properly(in only one job) when there is no while loop as below: >> >> p = beam.Pipeline(options=PipelineOptions()) >> >> (p >> | 'create surveys' >> beam.Create(id_list) >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, >> from_date, to_date)) >>| 'send to output' >> beam.FlatMap(lambda input: >> (send_to_api(input))) >>) >> >> p.run().wait_until_finish() >> >> Could someone please suggest how to make the task run in same job instead of >> creating multiple jobs. >> >> Please let me know in case if any additional information needed. >> >> Thanks, >> Anjana >> >> >> --- >> The information contained in this communication is intended solely for the >> use of the individual or entity to whom it is addressed and others >> authorized to receive it. It may contain confidential or legally privileged >> information. If you are not the intended recipient you are hereby notified >> that any disclosure, copying, distribution or taking any action in reliance >> on the contents of this information is strictly prohibited and may be >> unlawful. If you are not the intended recipient, please notify us >> immediately by responding to this email and then delete it from your system. >> Bahwan Cybertek is neither liable for the proper and complete transmission >> of the information contained in this communication nor for any delay in its >> receipt. > > > > -- > > JC >
Re: Looping in Dataflow(Creating multiple jobs for a while loop)
Hi Anjana, You need to separate your line of thoughts between the pipeline definition vs what happens when you call *run* on the pipeline, given that you need externalize the scheduling using something like a crontab, jenkins, or another mechanism. Best regards, JC On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi wrote: > Hi, > > I am trying to run a task using an infinite while loop with change in > input parameters as below but it creates a new job for every time. (I > expect it to use same pipeline that is created before while loop and should > be in a single job) > > *p = beam.Pipeline(options=PipelineOptions())* > > > > > > > > > > > > > *while True: to_date = time.time() (p | 'create > surveys' >> beam.Create(id_list)| 'get data' >> > beam.FlatMap(lambda id: get_api_data(id, from_date, to_date))| > 'send to output' >> beam.FlatMap(lambda input: (send_to_api(input))) >) from_date = to_date + timedelta(microseconds=1) > time.sleep(30) p.run().wait_until_finish()* > > It works properly(in only one job) when there is no while loop as below: > > > > > > > > > > *p = beam.Pipeline(options=PipelineOptions()) (p | 'create > surveys' >> beam.Create(id_list)| 'get data' >> > beam.FlatMap(lambda id: get_api_data(id, from_date, to_date))| > 'send to output' >> beam.FlatMap(lambda input: (send_to_api(input))) >) p.run().wait_until_finish()* > > Could someone please suggest how to make the task run in same job instead > of creating multiple jobs. > > Please let me know in case if any additional information needed. > > Thanks, > Anjana > > > --- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others > authorized to receive it. It may contain confidential or legally privileged > information. If you are not the intended recipient you are hereby notified > that any disclosure, copying, distribution or taking any action in reliance > on the contents of this information is strictly prohibited and may be > unlawful. If you are not the intended recipient, please notify us > immediately by responding to this email and then delete it from your > system. Bahwan Cybertek is neither liable for the proper and complete > transmission of the information contained in this communication nor for any > delay in its receipt. > -- JC