What would cause multiple computation tasks to run on the cluster at the same time? Are you worried about concurrent DagRuns? Does setting dag concurrency and/or task concurrency appropriately solve your problem?
Chris On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <dszakal...@whitepages.com> wrote: > What I am doing is very similar. However I am including the DagRun's id in > the pool name to make it unique, as I need to make sure every run gets its > own pool. I am getting that from the context object, which is only > available within execute methods or templates. How do you make sure each > run has it's own pool? > > > Thanks, > > Dávid Szakállas > Software Engineer | Whitepages Data Services > > ________________________________ > From: Taylor Edmiston <tedmis...@gmail.com> > Sent: Thursday, September 20, 2018 6:17:05 PM > To: dev@airflow.incubator.apache.org > Subject: Re: Creating dynamic pool from task > > I've done something similar. I have a task at the front of the DAG that > ensures the connection pool exists and creates the pool if it doesn't. > I've pasted my code below. This runs in a for loop that creates one DAG > per iteration each with its own pool. Then I pass the pool name into the > sensors. > > Does this work for your use case? > > -- > > redshift_pool = PythonOperator( > task_id='redshift_pool', > dag=dag, > python_callable=ensure_redshift_pool, > op_kwargs={ > 'name': workflow.pool, > 'slots': REDSHIFT_POOL_SLOTS, > }, > ... > ) > > @provide_session > def ensure_redshift_pool(name, slots, session=None): > pool = Pool(pool=name, slots=slots) > pool_query = ( > session.query(Pool) > .filter(Pool.pool == name) > ) > pool_query_result = pool_query.one_or_none() > if not pool_query_result: > logger.info(f'redshift pool "{name}" does not exist - creating > it') > session.add(pool) > session.commit() > logger.info(f'created redshift pool "{name}"') > else: > logger.info(f'redshift pool "{name}" already exists') > > -- > > *Taylor Edmiston* > Blog <https://blog.tedmiston.com/> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | Stack Overflow > <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story > <https://stackoverflow.com/story/taylor> > > > > On Thu, Sep 20, 2018 at 10:08 AM David Szakallas < > dszakal...@whitepages.com> > wrote: > > > Hi all, > > > > I have a DAG that creates a cluster, starts computation tasks, and after > > they completed, tears down the cluster. I want to limit concurrency for > the > > computation tasks carried on this cluster to fixed number. So logically, > I > > need a pool that is exclusive to the cluster created by a task. I don't > > want interference with other DAGs or different runs of the same DAG. > > > > I thought I could solve this problem by creating a pool dynamically from > a > > task after the cluster is created and delete it once the computation > tasks > > are finished. I thought I could template the pool parameter of the > > computation tasks to make them use this dynamically created cluster. > > > > However this way the computation tasks will never be triggered. So I > think > > the pool parameter is saved in the task instance before being templated. > I > > would like to hear your thoughts on how to achieve the desired behavior. > > > > Thanks, > > > > Dávid Szakállas > > Software Engineer | Whitepages Data Services > > > > > > > > > > >