I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the
sensors.

Does this work for your use case?

--

redshift_pool = PythonOperator(
    task_id='redshift_pool',
    dag=dag,
    python_callable=ensure_redshift_pool,
    op_kwargs={
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,
    },
    ...
)

@provide_session
def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        session.query(Pool)
        .filter(Pool.pool == name)
    )
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:
        logger.info(f'redshift pool "{name}" does not exist - creating it')
        session.add(pool)
        session.commit()
        logger.info(f'created redshift pool "{name}"')
    else:
        logger.info(f'redshift pool "{name}" already exists')

--

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>



On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <dszakal...@whitepages.com>
wrote:

> Hi all,
>
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
>
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
>
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
>
>
>
>

Reply via email to