Chris, the tasks are independent of each other so they can run concurrently. I have to limit the concurrency though, so they don’t starve. As the cluster is created dynamically with a task, a shared pool with other DAGs or other runs of the same DAG is not preferable.
I imagined something like this: .——> [compute_1] ——. / ——> [compute_2] —— \ / . \ [create_cluster] —> [create_pool_x6] . [ delete_pool ] —> [delete cluster] \ . / \ ——> [compute_19] —— / . ——> [compute_20] ——. Thanks, David > On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote: > > What would cause multiple computation tasks to run on the cluster at the > same time? Are you worried about concurrent DagRuns? Does setting dag > concurrency and/or task concurrency appropriately solve your problem? > > Chris > > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <dszakal...@whitepages.com> > wrote: > >> What I am doing is very similar. However I am including the DagRun's id in >> the pool name to make it unique, as I need to make sure every run gets its >> own pool. I am getting that from the context object, which is only >> available within execute methods or templates. How do you make sure each >> run has it's own pool? >> >> >> Thanks, >> >> Dávid Szakállas >> Software Engineer | Whitepages Data Services >> >> ________________________________ >> From: Taylor Edmiston <tedmis...@gmail.com> >> Sent: Thursday, September 20, 2018 6:17:05 PM >> To: dev@airflow.incubator.apache.org >> Subject: Re: Creating dynamic pool from task >> >> I've done something similar. I have a task at the front of the DAG that >> ensures the connection pool exists and creates the pool if it doesn't. >> I've pasted my code below. This runs in a for loop that creates one DAG >> per iteration each with its own pool. Then I pass the pool name into the >> sensors. >> >> Does this work for your use case? >> >> -- >> >> redshift_pool = PythonOperator( >> task_id='redshift_pool', >> dag=dag, >> python_callable=ensure_redshift_pool, >> op_kwargs={ >> 'name': workflow.pool, >> 'slots': REDSHIFT_POOL_SLOTS, >> }, >> ... >> ) >> >> @provide_session >> def ensure_redshift_pool(name, slots, session=None): >> pool = Pool(pool=name, slots=slots) >> pool_query = ( >> session.query(Pool) >> .filter(Pool.pool == name) >> ) >> pool_query_result = pool_query.one_or_none() >> if not pool_query_result: >> logger.info(f'redshift pool "{name}" does not exist - creating >> it') >> session.add(pool) >> session.commit() >> logger.info(f'created redshift pool "{name}"') >> else: >> logger.info(f'redshift pool "{name}" already exists') >> >> -- >> >> *Taylor Edmiston* >> Blog <https://blog.tedmiston.com/> | LinkedIn >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story >> <https://stackoverflow.com/story/taylor> >> >> >> >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas < >> dszakal...@whitepages.com> >> wrote: >> >>> Hi all, >>> >>> I have a DAG that creates a cluster, starts computation tasks, and after >>> they completed, tears down the cluster. I want to limit concurrency for >> the >>> computation tasks carried on this cluster to fixed number. So logically, >> I >>> need a pool that is exclusive to the cluster created by a task. I don't >>> want interference with other DAGs or different runs of the same DAG. >>> >>> I thought I could solve this problem by creating a pool dynamically from >> a >>> task after the cluster is created and delete it once the computation >> tasks >>> are finished. I thought I could template the pool parameter of the >>> computation tasks to make them use this dynamically created cluster. >>> >>> However this way the computation tasks will never be triggered. So I >> think >>> the pool parameter is saved in the task instance before being templated. >> I >>> would like to hear your thoughts on how to achieve the desired behavior. >>> >>> Thanks, >>> >>> Dávid Szakállas >>> Software Engineer | Whitepages Data Services >>> >>> >>> >>> >>> >>