I created the job that allows me to implement triggering of other jobs (needed
something until REST API comes alone, maybe even later). It’s an interesting
hack of dynamic tasks created on DAG load, and they are TriggerDagRunOperators.
In a nutshell they pull database on dag load and create as many TriggerDagRun
tasks as needed based on entry (dag name and json data to push). It works, with
some scheduling quirks (of course), but the issue I am having, the dag load and
scheduled run (when dag run tasks executed) are not in synch. So I cannot mark
entries in use, otherwise the load closest to run will not find entries. I
tried another approach of generating 2nd dag with these tasks during scheduled
execution of first dag, but this dag doesn’t get loaded. Why? I know the
narrative is quite confusing so here are code files.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from turbine_plugin.meta.turbine_trigger import Trigger
from collections import defaultdict
def get_triggers_by_dag():
trigger = Trigger()
all_triggers = trigger.get_all_dags()
triggers_by_dag = defaultdict(list)
for t in all_triggers:
triggers_by_dag[t.get('dag')].append(t)
return triggers_by_dag
def set_trigger_back(context):
trigger = Trigger()
trigger.update(trigger_id=context['params']['id'], status=0)
def delete_trigger(context):
trigger = Trigger()
trigger.delete(trigger_id=context['params']['id'])
def useless_callable(context, dag_run_obj):
"""This function is implemented because developers like shims, I guess."""
dag_run_obj.payload = {'value': context['params']['value']}
return dag_run_obj
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 12, 1),
'retries': 0,
'retry_delay': timedelta(seconds=15)
}
dag = DAG('trigger_dag',
schedule_interval=timedelta(minutes=1),
default_args=default_args)
# trying everything to make it work
start = DummyOperator(task_id='dummy', owner='airflow', dag=dag)
triggers = get_triggers_by_dag()
for dag_id in triggers.keys():
last_task = None
for i, t in enumerate(triggers[dag_id]):
task = TriggerDagRunOperator(
task_id='{dag}_{seq}_task'.format(dag=dag_id, seq=str(i)),
trigger_dag_id=dag_id,
python_callable=useless_callable,
params=t,
dag=dag
)
task.on_failure_callback = set_trigger_back
task.on_success_callback = delete_trigger
task.set_upstream(last_task if last_task else start)
last_task = task
And the second one
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from turbine_plugin.meta.turbine_trigger import Trigger
from collections import defaultdict
def get_triggers_by_dag():
trigger = Trigger()
all_triggers = trigger.get_all_dags()
triggers_by_dag = defaultdict(list)
for t in all_triggers:
triggers_by_dag[t.get('dag')].append(t)
return triggers_by_dag
def set_trigger_back(context):
trigger = Trigger()
trigger.update(trigger_id=context['params']['id'], status=0)
def delete_trigger(context):
trigger = Trigger()
trigger.delete(trigger_id=context['params']['id'])
def useless_callable(context, dag_run_obj):
"""This function is implemented because developers like shims, I guess."""
dag_run_obj.payload = {'value': context['params']['value']}
return dag_run_obj
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 12, 15, 21, 0),
'retries': 0,
'retry_delay': timedelta(seconds=5)
}
def generate_new_dag(ds, **kwargs):
triggers = get_triggers_by_dag()
if triggers:
dag_new =
DAG('semaphor_{time}'.format(time=datetime.now().strftime('%Y%m%d%H%M')),
schedule_interval='@once',
default_args=default_args)
for dag_id in triggers.keys():
last_task = None
for i, t in enumerate(triggers[dag_id]):
task = TriggerDagRunOperator(
task_id='{dag}_{seq}_task'.format(dag=dag_id, seq=str(i)),
trigger_dag_id=dag_id,
python_callable=useless_callable,
params=t,
dag=dag_new
)
task.on_failure_callback = set_trigger_back
task.on_success_callback = delete_trigger
if last_task:
task.set_upstream(last_task)
last_task = task
dag = DAG('dag_generator',
schedule_interval=timedelta(minutes=2),
default_args=default_args)
start = PythonOperator(task_id='dag_generator',
python_callable=generate_new_dag,
owner='airflow',
provide_context=True,
params={},
dag=dag)