You can put your pipeline definition inside your while loop, i.e.

while True:
   to_date = time.time()

   p = beam.Pipeline(options=PipelineOptions())
   (p
        | 'create surveys' >> beam.Create(id_list)
           | 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
from_date, to_date))
           | 'send to output' >> beam.FlatMap(lambda input:
(send_to_api(input)))
           )
    from_date = to_date + timedelta(microseconds=1)
    time.sleep(30)
    p.run().wait_until_finish()

which will run pipelines until your script terminates, but this will
fire off a new batch job every 30+ seconds. It sounds like what you'd
really want to do here is have a single streaming pipeline.

On Mon, Sep 16, 2019 at 11:24 AM Juan Carlos Garcia <[email protected]> wrote:
>
> Hi Anjana,
>
> You need to separate your line of thoughts between the pipeline definition vs 
> what happens when you call *run* on the pipeline, given that you need 
> externalize the scheduling using something like a crontab, jenkins, or 
> another mechanism.
>
> Best regards,
> JC
>
> On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi <[email protected]> 
> wrote:
>>
>> Hi,
>>
>> I am trying to run a task using an infinite while loop with change in input 
>> parameters as below but it creates a new job for every time. (I expect it to 
>> use same pipeline that is created before while loop and should be in a 
>> single job)
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> while True:
>>     to_date = time.time()
>>
>>     (p
>>         | 'create surveys' >> beam.Create(id_list)
>>            | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, 
>> from_date, to_date))
>>            | 'send to output' >> beam.FlatMap(lambda input: 
>> (send_to_api(input)))
>>            )
>>     from_date = to_date + timedelta(microseconds=1)
>>     time.sleep(30)
>>
>> p.run().wait_until_finish()
>>
>> It works properly(in only one job) when there is no while loop as below:
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>>     (p
>>         | 'create surveys' >> beam.Create(id_list)
>>            | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, 
>> from_date, to_date))
>>            | 'send to output' >> beam.FlatMap(lambda input: 
>> (send_to_api(input)))
>>            )
>>
>> p.run().wait_until_finish()
>>
>> Could someone please suggest how to make the task run in same job instead of 
>> creating multiple jobs.
>>
>> Please let me know in case if any additional information needed.
>>
>> Thanks,
>> Anjana
>>
>>
>> -----------------------------------------------------------------------------------------------------------------------
>>  The information contained in this communication is intended solely for the 
>> use of the individual or entity to whom it is addressed and others 
>> authorized to receive it. It may contain confidential or legally privileged 
>> information. If you are not the intended recipient you are hereby notified 
>> that any disclosure, copying, distribution or taking any action in reliance 
>> on the contents of this information is strictly prohibited and may be 
>> unlawful. If you are not the intended recipient, please notify us 
>> immediately by responding to this email and then delete it from your system. 
>> Bahwan Cybertek is neither liable for the proper and complete transmission 
>> of the information contained in this communication nor for any delay in its 
>> receipt.
>
>
>
> --
>
> JC
>

Reply via email to