You can put your pipeline definition inside your while loop, i.e.
while True:
to_date = time.time()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'create surveys' >> beam.Create(id_list)
| 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
from_date, to_date))
| 'send to output' >> beam.FlatMap(lambda input:
(send_to_api(input)))
)
from_date = to_date + timedelta(microseconds=1)
time.sleep(30)
p.run().wait_until_finish()
which will run pipelines until your script terminates, but this will
fire off a new batch job every 30+ seconds. It sounds like what you'd
really want to do here is have a single streaming pipeline.
On Mon, Sep 16, 2019 at 11:24 AM Juan Carlos Garcia <[email protected]> wrote:
>
> Hi Anjana,
>
> You need to separate your line of thoughts between the pipeline definition vs
> what happens when you call *run* on the pipeline, given that you need
> externalize the scheduling using something like a crontab, jenkins, or
> another mechanism.
>
> Best regards,
> JC
>
> On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi <[email protected]>
> wrote:
>>
>> Hi,
>>
>> I am trying to run a task using an infinite while loop with change in input
>> parameters as below but it creates a new job for every time. (I expect it to
>> use same pipeline that is created before while loop and should be in a
>> single job)
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> while True:
>> to_date = time.time()
>>
>> (p
>> | 'create surveys' >> beam.Create(id_list)
>> | 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
>> from_date, to_date))
>> | 'send to output' >> beam.FlatMap(lambda input:
>> (send_to_api(input)))
>> )
>> from_date = to_date + timedelta(microseconds=1)
>> time.sleep(30)
>>
>> p.run().wait_until_finish()
>>
>> It works properly(in only one job) when there is no while loop as below:
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> (p
>> | 'create surveys' >> beam.Create(id_list)
>> | 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
>> from_date, to_date))
>> | 'send to output' >> beam.FlatMap(lambda input:
>> (send_to_api(input)))
>> )
>>
>> p.run().wait_until_finish()
>>
>> Could someone please suggest how to make the task run in same job instead of
>> creating multiple jobs.
>>
>> Please let me know in case if any additional information needed.
>>
>> Thanks,
>> Anjana
>>
>>
>> -----------------------------------------------------------------------------------------------------------------------
>> The information contained in this communication is intended solely for the
>> use of the individual or entity to whom it is addressed and others
>> authorized to receive it. It may contain confidential or legally privileged
>> information. If you are not the intended recipient you are hereby notified
>> that any disclosure, copying, distribution or taking any action in reliance
>> on the contents of this information is strictly prohibited and may be
>> unlawful. If you are not the intended recipient, please notify us
>> immediately by responding to this email and then delete it from your system.
>> Bahwan Cybertek is neither liable for the proper and complete transmission
>> of the information contained in this communication nor for any delay in its
>> receipt.
>
>
>
> --
>
> JC
>