I created the job that allows me to implement triggering of other jobs (needed 
something until REST API comes alone, maybe even later). It’s an interesting 
hack of dynamic tasks created on DAG load, and they are TriggerDagRunOperators. 
In a nutshell they pull database on dag load and create as many TriggerDagRun 
tasks as needed based on entry (dag name and json data to push). It works, with 
some scheduling quirks (of course), but the issue I am having, the dag load and 
scheduled run (when dag run tasks executed) are not in synch. So I cannot mark 
entries in use, otherwise the load closest to run will not find entries. I 
tried another approach of generating 2nd dag with these tasks during scheduled 
execution of first dag, but this dag doesn’t get loaded. Why? I know the 
narrative is quite confusing so here are code files.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from turbine_plugin.meta.turbine_trigger import Trigger
from collections import defaultdict


def get_triggers_by_dag():
    trigger = Trigger()
    all_triggers = trigger.get_all_dags()
    triggers_by_dag = defaultdict(list)
    for t in all_triggers:
        triggers_by_dag[t.get('dag')].append(t)
    return triggers_by_dag


def set_trigger_back(context):
    trigger = Trigger()
    trigger.update(trigger_id=context['params']['id'], status=0)


def delete_trigger(context):
    trigger = Trigger()
    trigger.delete(trigger_id=context['params']['id'])


def useless_callable(context, dag_run_obj):
    """This function is implemented because developers like shims, I guess."""
    dag_run_obj.payload = {'value': context['params']['value']}
    return dag_run_obj


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 12, 1),
    'retries': 0,
    'retry_delay': timedelta(seconds=15)
}

dag = DAG('trigger_dag',
          schedule_interval=timedelta(minutes=1),
          default_args=default_args)

# trying everything to make it work
start = DummyOperator(task_id='dummy', owner='airflow', dag=dag)

triggers = get_triggers_by_dag()
for dag_id in triggers.keys():
    last_task = None
    for i, t in enumerate(triggers[dag_id]):
        task = TriggerDagRunOperator(
            task_id='{dag}_{seq}_task'.format(dag=dag_id, seq=str(i)),
            trigger_dag_id=dag_id,
            python_callable=useless_callable,
            params=t,
            dag=dag
        )
        task.on_failure_callback = set_trigger_back
        task.on_success_callback = delete_trigger
        task.set_upstream(last_task if last_task else start)
        last_task = task


And the second one

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from turbine_plugin.meta.turbine_trigger import Trigger
from collections import defaultdict


def get_triggers_by_dag():
    trigger = Trigger()
    all_triggers = trigger.get_all_dags()
    triggers_by_dag = defaultdict(list)
    for t in all_triggers:
        triggers_by_dag[t.get('dag')].append(t)
    return triggers_by_dag


def set_trigger_back(context):
    trigger = Trigger()
    trigger.update(trigger_id=context['params']['id'], status=0)


def delete_trigger(context):
    trigger = Trigger()
    trigger.delete(trigger_id=context['params']['id'])


def useless_callable(context, dag_run_obj):
    """This function is implemented because developers like shims, I guess."""
    dag_run_obj.payload = {'value': context['params']['value']}
    return dag_run_obj


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 12, 15, 21, 0),
    'retries': 0,
    'retry_delay': timedelta(seconds=5)
}


def generate_new_dag(ds, **kwargs):
    triggers = get_triggers_by_dag()
    if triggers:
        dag_new = 
DAG('semaphor_{time}'.format(time=datetime.now().strftime('%Y%m%d%H%M')),
                      schedule_interval='@once',
                      default_args=default_args)
        for dag_id in triggers.keys():
            last_task = None
            for i, t in enumerate(triggers[dag_id]):
                task = TriggerDagRunOperator(
                    task_id='{dag}_{seq}_task'.format(dag=dag_id, seq=str(i)),
                    trigger_dag_id=dag_id,
                    python_callable=useless_callable,
                    params=t,
                    dag=dag_new
                )
                task.on_failure_callback = set_trigger_back
                task.on_success_callback = delete_trigger
                if last_task:
                    task.set_upstream(last_task)
                last_task = task

dag = DAG('dag_generator',
          schedule_interval=timedelta(minutes=2),
          default_args=default_args)

start = PythonOperator(task_id='dag_generator',
                       python_callable=generate_new_dag,
                       owner='airflow',
                       provide_context=True,
                       params={},
                       dag=dag)


Reply via email to