I've done something similar. I have a task at the front of the DAG that ensures the connection pool exists and creates the pool if it doesn't. I've pasted my code below. This runs in a for loop that creates one DAG per iteration each with its own pool. Then I pass the pool name into the sensors.
Does this work for your use case? -- redshift_pool = PythonOperator( task_id='redshift_pool', dag=dag, python_callable=ensure_redshift_pool, op_kwargs={ 'name': workflow.pool, 'slots': REDSHIFT_POOL_SLOTS, }, ... ) @provide_session def ensure_redshift_pool(name, slots, session=None): pool = Pool(pool=name, slots=slots) pool_query = ( session.query(Pool) .filter(Pool.pool == name) ) pool_query_result = pool_query.one_or_none() if not pool_query_result: logger.info(f'redshift pool "{name}" does not exist - creating it') session.add(pool) session.commit() logger.info(f'created redshift pool "{name}"') else: logger.info(f'redshift pool "{name}" already exists') -- *Taylor Edmiston* Blog <https://blog.tedmiston.com/> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story <https://stackoverflow.com/story/taylor> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <dszakal...@whitepages.com> wrote: > Hi all, > > I have a DAG that creates a cluster, starts computation tasks, and after > they completed, tears down the cluster. I want to limit concurrency for the > computation tasks carried on this cluster to fixed number. So logically, I > need a pool that is exclusive to the cluster created by a task. I don't > want interference with other DAGs or different runs of the same DAG. > > I thought I could solve this problem by creating a pool dynamically from a > task after the cluster is created and delete it once the computation tasks > are finished. I thought I could template the pool parameter of the > computation tasks to make them use this dynamically created cluster. > > However this way the computation tasks will never be triggered. So I think > the pool parameter is saved in the task instance before being templated. I > would like to hear your thoughts on how to achieve the desired behavior. > > Thanks, > > Dávid Szakállas > Software Engineer | Whitepages Data Services > > > > >