I also believe Chris is correct that it's not quite possible to be that dynamic today.
If you can find a workaround like only running 1 DAG run for each DAG at a time and reusing the pool, or perhaps it might work to create the pool based on the DagRun's execution date instead of its id? *Taylor Edmiston* Blog <https://blog.tedmiston.com/> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story <https://stackoverflow.com/story/taylor> On Tue, Sep 25, 2018 at 10:24 AM, Chris Palmer <crisp...@gmail.com> wrote: > David, > > I was playing around with this over the weekend, and mostly found that it > doesn't seem to be possible. I was able to get an operator to template out > the pool attribute, when it renders it's templates. However this doesn't > normally get done until execution, and so the un-templated pool attribute > get's used when the scheduler sends the task to the executor. > > Chris > > On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer <crisp...@gmail.com> wrote: > > > I see, so for a given DagRun you want to limit the compute tasks that are > > running. But I'm guessing you want multiple DagRuns to be able to run > > concurrently to operate on their own clusters independently. > > > > From what I could tell in the code, the pool gets checked before > execution > > (which is when templates are rendered). Which makes dynamic pools > difficult > > to do. > > > > It's probably possible to find a solution but I think it's likely going > to > > involve some ugly code/inspection of the python stack. > > > > Chris > > > > On Sep 21, 2018 4:47 PM, "David Szakallas" <dszakal...@whitepages.com> > > wrote: > > > > Chris, the tasks are independent of each other so they can run > > concurrently. I have to limit the concurrency though, so they don’t > starve. > > As the cluster is created dynamically with a task, a shared pool with > other > > DAGs or other runs of the same DAG is not preferable. > > > > I imagined something like this: > > > > .——> > > [compute_1] ——. > > / ——> > > [compute_2] —— \ > > / > > . \ > > [create_cluster] —> [create_pool_x6] . > > [ delete_pool ] —> [delete cluster] > > \ > > . / > > \ ——> > > [compute_19] —— / > > . ——> > > [compute_20] ——. > > Thanks, > > David > > > > > > > On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote: > > > > > > What would cause multiple computation tasks to run on the cluster at > the > > > same time? Are you worried about concurrent DagRuns? Does setting dag > > > concurrency and/or task concurrency appropriately solve your problem? > > > > > > Chris > > > > > > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas < > > dszakal...@whitepages.com> > > > wrote: > > > > > >> What I am doing is very similar. However I am including the DagRun's > id > > in > > >> the pool name to make it unique, as I need to make sure every run gets > > its > > >> own pool. I am getting that from the context object, which is only > > >> available within execute methods or templates. How do you make sure > each > > >> run has it's own pool? > > >> > > >> > > >> Thanks, > > >> > > >> Dávid Szakállas > > >> Software Engineer | Whitepages Data Services > > >> > > >> ________________________________ > > >> From: Taylor Edmiston <tedmis...@gmail.com> > > >> Sent: Thursday, September 20, 2018 6:17:05 PM > > >> To: dev@airflow.incubator.apache.org > > >> Subject: Re: Creating dynamic pool from task > > >> > > >> I've done something similar. I have a task at the front of the DAG > that > > >> ensures the connection pool exists and creates the pool if it doesn't. > > >> I've pasted my code below. This runs in a for loop that creates one > DAG > > >> per iteration each with its own pool. Then I pass the pool name into > > the > > >> sensors. > > >> > > >> Does this work for your use case? > > >> > > >> -- > > >> > > >> redshift_pool = PythonOperator( > > >> task_id='redshift_pool', > > >> dag=dag, > > >> python_callable=ensure_redshift_pool, > > >> op_kwargs={ > > >> 'name': workflow.pool, > > >> 'slots': REDSHIFT_POOL_SLOTS, > > >> }, > > >> ... > > >> ) > > >> > > >> @provide_session > > >> def ensure_redshift_pool(name, slots, session=None): > > >> pool = Pool(pool=name, slots=slots) > > >> pool_query = ( > > >> session.query(Pool) > > >> .filter(Pool.pool == name) > > >> ) > > >> pool_query_result = pool_query.one_or_none() > > >> if not pool_query_result: > > >> logger.info(f'redshift pool "{name}" does not exist - creating > > >> it') > > >> session.add(pool) > > >> session.commit() > > >> logger.info(f'created redshift pool "{name}"') > > >> else: > > >> logger.info(f'redshift pool "{name}" already exists') > > >> > > >> -- > > >> > > >> *Taylor Edmiston* > > >> Blog <https://blog.tedmiston.com/> | LinkedIn > > >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow > > >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer > > Story > > >> <https://stackoverflow.com/story/taylor> > > >> > > >> > > >> > > >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas < > > >> dszakal...@whitepages.com> > > >> wrote: > > >> > > >>> Hi all, > > >>> > > >>> I have a DAG that creates a cluster, starts computation tasks, and > > after > > >>> they completed, tears down the cluster. I want to limit concurrency > for > > >> the > > >>> computation tasks carried on this cluster to fixed number. So > > logically, > > >> I > > >>> need a pool that is exclusive to the cluster created by a task. I > don't > > >>> want interference with other DAGs or different runs of the same DAG. > > >>> > > >>> I thought I could solve this problem by creating a pool dynamically > > from > > >> a > > >>> task after the cluster is created and delete it once the computation > > >> tasks > > >>> are finished. I thought I could template the pool parameter of the > > >>> computation tasks to make them use this dynamically created cluster. > > >>> > > >>> However this way the computation tasks will never be triggered. So I > > >> think > > >>> the pool parameter is saved in the task instance before being > > templated. > > >> I > > >>> would like to hear your thoughts on how to achieve the desired > > behavior. > > >>> > > >>> Thanks, > > >>> > > >>> Dávid Szakállas > > >>> Software Engineer | Whitepages Data Services > > >>> > > >>> > > >>> > > >>> > > >>> > > >> > > > > > > >