David, I was playing around with this over the weekend, and mostly found that it doesn't seem to be possible. I was able to get an operator to template out the pool attribute, when it renders it's templates. However this doesn't normally get done until execution, and so the un-templated pool attribute get's used when the scheduler sends the task to the executor.
Chris On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer <crisp...@gmail.com> wrote: > I see, so for a given DagRun you want to limit the compute tasks that are > running. But I'm guessing you want multiple DagRuns to be able to run > concurrently to operate on their own clusters independently. > > From what I could tell in the code, the pool gets checked before execution > (which is when templates are rendered). Which makes dynamic pools difficult > to do. > > It's probably possible to find a solution but I think it's likely going to > involve some ugly code/inspection of the python stack. > > Chris > > On Sep 21, 2018 4:47 PM, "David Szakallas" <dszakal...@whitepages.com> > wrote: > > Chris, the tasks are independent of each other so they can run > concurrently. I have to limit the concurrency though, so they don’t starve. > As the cluster is created dynamically with a task, a shared pool with other > DAGs or other runs of the same DAG is not preferable. > > I imagined something like this: > > .——> > [compute_1] ——. > / ——> > [compute_2] —— \ > / > . \ > [create_cluster] —> [create_pool_x6] . > [ delete_pool ] —> [delete cluster] > \ > . / > \ ——> > [compute_19] —— / > . ——> > [compute_20] ——. > Thanks, > David > > > > On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote: > > > > What would cause multiple computation tasks to run on the cluster at the > > same time? Are you worried about concurrent DagRuns? Does setting dag > > concurrency and/or task concurrency appropriately solve your problem? > > > > Chris > > > > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas < > dszakal...@whitepages.com> > > wrote: > > > >> What I am doing is very similar. However I am including the DagRun's id > in > >> the pool name to make it unique, as I need to make sure every run gets > its > >> own pool. I am getting that from the context object, which is only > >> available within execute methods or templates. How do you make sure each > >> run has it's own pool? > >> > >> > >> Thanks, > >> > >> Dávid Szakállas > >> Software Engineer | Whitepages Data Services > >> > >> ________________________________ > >> From: Taylor Edmiston <tedmis...@gmail.com> > >> Sent: Thursday, September 20, 2018 6:17:05 PM > >> To: dev@airflow.incubator.apache.org > >> Subject: Re: Creating dynamic pool from task > >> > >> I've done something similar. I have a task at the front of the DAG that > >> ensures the connection pool exists and creates the pool if it doesn't. > >> I've pasted my code below. This runs in a for loop that creates one DAG > >> per iteration each with its own pool. Then I pass the pool name into > the > >> sensors. > >> > >> Does this work for your use case? > >> > >> -- > >> > >> redshift_pool = PythonOperator( > >> task_id='redshift_pool', > >> dag=dag, > >> python_callable=ensure_redshift_pool, > >> op_kwargs={ > >> 'name': workflow.pool, > >> 'slots': REDSHIFT_POOL_SLOTS, > >> }, > >> ... > >> ) > >> > >> @provide_session > >> def ensure_redshift_pool(name, slots, session=None): > >> pool = Pool(pool=name, slots=slots) > >> pool_query = ( > >> session.query(Pool) > >> .filter(Pool.pool == name) > >> ) > >> pool_query_result = pool_query.one_or_none() > >> if not pool_query_result: > >> logger.info(f'redshift pool "{name}" does not exist - creating > >> it') > >> session.add(pool) > >> session.commit() > >> logger.info(f'created redshift pool "{name}"') > >> else: > >> logger.info(f'redshift pool "{name}" already exists') > >> > >> -- > >> > >> *Taylor Edmiston* > >> Blog <https://blog.tedmiston.com/> | LinkedIn > >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow > >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer > Story > >> <https://stackoverflow.com/story/taylor> > >> > >> > >> > >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas < > >> dszakal...@whitepages.com> > >> wrote: > >> > >>> Hi all, > >>> > >>> I have a DAG that creates a cluster, starts computation tasks, and > after > >>> they completed, tears down the cluster. I want to limit concurrency for > >> the > >>> computation tasks carried on this cluster to fixed number. So > logically, > >> I > >>> need a pool that is exclusive to the cluster created by a task. I don't > >>> want interference with other DAGs or different runs of the same DAG. > >>> > >>> I thought I could solve this problem by creating a pool dynamically > from > >> a > >>> task after the cluster is created and delete it once the computation > >> tasks > >>> are finished. I thought I could template the pool parameter of the > >>> computation tasks to make them use this dynamically created cluster. > >>> > >>> However this way the computation tasks will never be triggered. So I > >> think > >>> the pool parameter is saved in the task instance before being > templated. > >> I > >>> would like to hear your thoughts on how to achieve the desired > behavior. > >>> > >>> Thanks, > >>> > >>> Dávid Szakállas > >>> Software Engineer | Whitepages Data Services > >>> > >>> > >>> > >>> > >>> > >> > > >