I see, so for a given DagRun you want to limit the compute tasks that are
running. But I'm guessing you want multiple DagRuns to be able to run
concurrently to operate on their own clusters independently.

>From what I could tell in the code, the pool gets checked before execution
(which is when templates are rendered). Which makes dynamic pools difficult
to do.

It's probably possible to find a solution but I think it's likely going to
involve some ugly code/inspection of the python stack.

Chris

On Sep 21, 2018 4:47 PM, "David Szakallas" <dszakal...@whitepages.com>
wrote:

Chris, the tasks are independent of each other so they can run
concurrently. I have to limit the concurrency though, so they don’t starve.
As the cluster is created dynamically with a task, a shared pool with other
DAGs or other runs of the same DAG is not preferable.

I imagined something like this:

                                                                    .——>
 [compute_1] ——.
                                                                  / ——>
 [compute_2] ——  \
                                                                /
             .                        \
[create_cluster] —> [create_pool_x6]                         .
          [ delete_pool ] —> [delete cluster]
                                                               \
            .                        /
                                                                 \ ——>
 [compute_19] —— /
                                                                   . ——>
[compute_20] ——.
Thanks,
David


> On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote:
>
> What would cause multiple computation tasks to run on the cluster at the
> same time? Are you worried about concurrent DagRuns? Does setting dag
> concurrency and/or task concurrency appropriately solve your problem?
>
> Chris
>
> On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <dszakal...@whitepages.com
>
> wrote:
>
>> What I am doing is very similar. However I am including the DagRun's id
in
>> the pool name to make it unique, as I need to make sure every run gets
its
>> own pool. I am getting that from the context object, which is only
>> available within execute methods or templates. How do you make sure each
>> run has it's own pool?
>>
>>
>> Thanks,
>>
>> Dávid Szakállas
>> Software Engineer | Whitepages Data Services
>>
>> ________________________________
>> From: Taylor Edmiston <tedmis...@gmail.com>
>> Sent: Thursday, September 20, 2018 6:17:05 PM
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Creating dynamic pool from task
>>
>> I've done something similar.  I have a task at the front of the DAG that
>> ensures the connection pool exists and creates the pool if it doesn't.
>> I've pasted my code below.  This runs in a for loop that creates one DAG
>> per iteration each with its own pool.  Then I pass the pool name into the
>> sensors.
>>
>> Does this work for your use case?
>>
>> --
>>
>> redshift_pool = PythonOperator(
>>    task_id='redshift_pool',
>>    dag=dag,
>>    python_callable=ensure_redshift_pool,
>>    op_kwargs={
>>        'name': workflow.pool,
>>        'slots': REDSHIFT_POOL_SLOTS,
>>    },
>>    ...
>> )
>>
>> @provide_session
>> def ensure_redshift_pool(name, slots, session=None):
>>    pool = Pool(pool=name, slots=slots)
>>    pool_query = (
>>        session.query(Pool)
>>        .filter(Pool.pool == name)
>>    )
>>    pool_query_result = pool_query.one_or_none()
>>    if not pool_query_result:
>>        logger.info(f'redshift pool "{name}" does not exist - creating
>> it')
>>        session.add(pool)
>>        session.commit()
>>        logger.info(f'created redshift pool "{name}"')
>>    else:
>>        logger.info(f'redshift pool "{name}" already exists')
>>
>> --
>>
>> *Taylor Edmiston*
>> Blog <https://blog.tedmiston.com/> | LinkedIn
>> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
>> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
Story
>> <https://stackoverflow.com/story/taylor>
>>
>>
>>
>> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
>> dszakal...@whitepages.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a DAG that creates a cluster, starts computation tasks, and after
>>> they completed, tears down the cluster. I want to limit concurrency for
>> the
>>> computation tasks carried on this cluster to fixed number. So logically,
>> I
>>> need a pool that is exclusive to the cluster created by a task. I don't
>>> want interference with other DAGs or different runs of the same DAG.
>>>
>>> I thought I could solve this problem by creating a pool dynamically from
>> a
>>> task after the cluster is created and delete it once the computation
>> tasks
>>> are finished. I thought I could template the pool parameter of the
>>> computation tasks to make them use this dynamically created cluster.
>>>
>>> However this way the computation tasks will never be triggered. So I
>> think
>>> the pool parameter is saved in the task instance before being templated.
>> I
>>> would like to hear your thoughts on how to achieve the desired behavior.
>>>
>>> Thanks,
>>>
>>> Dávid Szakállas
>>> Software Engineer | Whitepages Data Services
>>>
>>>
>>>
>>>
>>>
>>

Reply via email to