[ https://issues.apache.org/jira/browse/AIRFLOW-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on AIRFLOW-4355 started by t oo. ------------------------------------- > Externally triggered DAG is marked as 'success' even if a task has been > 'removed'! > ---------------------------------------------------------------------------------- > > Key: AIRFLOW-4355 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4355 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, DagRun, scheduler > Affects Versions: 1.10.3 > Reporter: t oo > Assignee: t oo > Priority: Blocker > Labels: dynamic > Fix For: 2.0.0 > > Attachments: dag_success_even_if_task_removed.png, treeview.png > > > note: all my dags are purely externally triggered > *Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that > somehow got 'removed' state (prior dag runs had 'failed' state) and never ran > successfully but still the DAG is showing success! > > *Command ran* (note that previous commands like airflow trigger_dag -e > 20190412 qsr_coremytbl were run before and failed for valid reason (ie python > task failing) ): > airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}' > > *some logs on prior instance of airflow (ec2 was autohealed):* > [2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 > 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: > qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' > for dag '<DAG: qsr_coremytbl>'. Marking it as removed. > [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 > 08:29:43,582] {__init__.py:4906} INFO - Restoring task '<TaskInstance: > qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' > which was previously removed from DAG '<DAG: qsr_coremytbl>' > [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating > <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 > 08:00:00+00:00 [scheduled]> in ORM > [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 > 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: > qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 > [scheduled]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed. > > *some logs on newer ec2:* > [myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | > grep -v 'airflow-webserver-access.log' > 2 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx > [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')] > 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx > [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', > u'json')] > 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl rendered xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx > [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')] > 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl task xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx > [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')] > 1 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] > \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: > qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx > [scheduled]> in ORM > 71 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] > \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} > INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl > xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG > '<DAG: qsr_coremytbl>' > 1 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] > \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: > qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx > [scheduled]> in ORM > 71 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] > \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} > INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl > xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG > '<DAG: qsr_coremytbl>' > > mysql> *select * from task_instance where task_id like '%REP%';#* > > +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+----------------- > |task_id|dag_id|execution_date|start_date|end_date|duration|state|try_number|hostname|unixname|job_id|pool|queue|priority_weight|operator|queued_dttm|pid|max_tries|executor_config| > +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+----------------- > |REPAIR_HIVE_mytbl|qsr_coremytbl|2019-04-13 00:00:00.000000|2019-04-17 > 20:29:19.639059|2019-04-17 > 20:29:33.700252|14.0612|failed|1|ip-ec2|myuser|305|NULL|default|1|PythonOperator|2019-04-17 > 20:29:13.604354|899|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-01 > 00:00:00.000000|2019-04-17 21:30:11.439127|2019-04-17 > 21:30:11.439142|NULL|upstream_failed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-02 > 00:00:00.000000|2019-04-17 21:46:34.163607|2019-04-17 > 21:46:34.163627|NULL|upstream_failed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 > 00:00:00.000000|2019-04-17 21:50:48.541224|2019-04-17 > 21:50:48.541239|NULL|upstream_failed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 > 05:00:00.000000|2019-04-17 22:00:24.286685|2019-04-17 > 22:00:24.286709|NULL|upstream_failed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 > 00:00:00.000000|2019-04-17 21:26:08.621737|2019-04-17 > 21:26:22.686882|14.0651|failed|1|ip-ec2|myuser|316|NULL|default|1|PythonOperator|2019-04-17 > 21:26:03.083885|29638|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 > 17:00:00.000000|2019-04-17 22:44:52.900860|2019-04-17 > 22:45:04.403179|11.5023|failed|1|ip-ec2|myuser|348|NULL|default|1|PythonOperator|2019-04-17 > 22:44:47.895363|10815|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 > 22:00:00.000000|2019-04-17 22:37:28.409799|2019-04-17 > 22:37:41.448494|13.0387|failed|1|ip-ec2|myuser|342|NULL|default|1|PythonOperator|2019-04-17 > 22:37:23.449554|28697|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 > 00:00:00.000000|2019-04-17 21:02:36.365150|2019-04-17 > 21:02:36.365165|NULL|upstream_failed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 > 07:00:00.000000|2019-04-18 08:24:59.552695|2019-04-18 > 08:24:59.552710|NULL|removed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 > 08:00:00.000000|NULL|NULL|NULL|removed|0| > |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .| > +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+----------------- > 11 rows in set (0.00 sec) > > > *Task Instance Details* > Dependencies Blocking Task From Getting Scheduled > Dependency Reason > Task Instance State Task is in the 'removed' state which is not a valid > state for execution. The task must be cleared in order to be run. > Dagrun Running Task instance's dagrun was not in the 'running' state but in > the state 'success'. > Attribute: python_callable > def repair_hive_table(hive_cli_conn_id, schema, table, > drop_partitions_first=False): > conn = BaseHook.get_connection(hive_cli_conn_id) > ssl_options = conn.extra_dejson.get('ssl-options') > jdbc_url = > "jdbc:hive2://\{conn.host}:\{conn.port}/\{conn.schema};\{ssl_options}".format(**locals()) > sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table) > hive_command = ' '.join([ > '/home/myuser/spark_home/bin/beeline', > '-u', '"%s"' % jdbc_url, > '-n', conn.login, > '-w', '/home/myuser/spark_home/hivepw', > '-e', '"%s"' % sqlstmt ]) > # note - do not log the command which contains truststore and hive user > passwords > logging.info("Executing following SQL statement: %s" % sqlstmt) > process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, > stderr=subprocess.STDOUT, shell=True) > (output, error) = process.communicate() > logging.info(output) > if process.returncode != 0: > logging.error('Hive process returned code %d' % process.returncode) > raise Exception('Hive process returned code %d' % process.returncode) > Task Instance Attributes > Attribute Value > dag_id qsr_coremytbl > duration None > end_date None > execution_date 2019-04-12T08:00:00+00:00 > executor_config {} > generate_command <function generate_command at 0x7f6526f97ed8> > hostname > is_premature False > job_id None > key ('qsr_coremytbl', u'REPAIR_HIVE_schemeh.mytbl', <Pendulum > [2019-04-12T08:00:00+00:00]>, 1) > log <logging.Logger object at 0x7f65267a2d50> > log_filepath > /home/myuser/airflow/logs/qsr_coremytbl/REPAIR_HIVE_schemeh.mytbl/2019-04-12T08:00:00+00:00.log > log_url > [https://domain/admin/airflow/log?dag_id=qsr_coremytbl&task_id=REPAIR_HIVE_schemeh.mytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00] > logger <logging.Logger object at 0x7f65267a2d50> > mark_success_url > [https://domain/admin/airflow/success?task_id=REPAIR_HIVE_schemeh.mytbl&dag_id=qsr_coremytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00&upstream=false&downstream=false] > max_tries 0 > metadata MetaData(bind=None) > next_try_number 1 > operator None > pid None > pool None > previous_ti None > priority_weight 1 > queue default > queued_dttm None > raw False > run_as_user None > start_date None > state removed > task <Task(PythonOperator): REPAIR_HIVE_schemeh.mytbl> > task_id REPAIR_HIVE_schemeh.mytbl > test_mode False > try_number 1 > unixname myuser > Task Attributes > Attribute Value > dag <DAG: qsr_coremytbl> > dag_id qsr_coremytbl > depends_on_past False > deps set([<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, > <TIDep(Previous Dagrun State)>]) > downstream_list [] > downstream_task_ids set([]) > email None > email_on_failure True > email_on_retry True > end_date None > execution_timeout None > executor_config {} > inlets [] > lineage_data None > log <logging.Logger object at 0x7f65167edad0> > logger <logging.Logger object at 0x7f65167edad0> > max_retry_delay None > on_failure_callback None > on_retry_callback None > on_success_callback None > op_args [] > op_kwargs \{'table': u'mytbl', 'hive_cli_conn_id': 'query_hive', 'schema': > u'schemeh'} > outlets [] > owner airflow > params {} > pool None > priority_weight 1 > priority_weight_total 1 > provide_context False > queue default > resources {'disk': > {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'} > , 'gpus': \{'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': > \{'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': \{'_qty': 1, > '_units_str': 'core(s)', '_name': 'CPU'}} > retries 0 > retry_delay 0:05:00 > retry_exponential_backoff False > run_as_user None > schedule_interval None > shallow_copy_attrs ('python_callable', 'op_kwargs') > sla None > start_date 2017-06-01T00:00:00+00:00 > task_concurrency None > task_id REPAIR_HIVE_schemeh.mytbl > task_type PythonOperator > template_ext [] > template_fields ('templates_dict', 'op_args', 'op_kwargs') > templates_dict None > trigger_rule all_success > ui_color #ffefeb > ui_fgcolor #000 > upstream_list [<Task(SparkSubmitOperator): coremytbl_FR__H>, > <Task(SparkSubmitOperator): coremytbl_BE__H>, <Task(SparkSubmitOperator): > coremytbl_NL__H>, <Task(SparkSubmitOperator): coremytbl_DE__H>, > <Task(SparkSubmitOperator): coremytbl_DAGNAME__H>] > upstream_task_ids set([u'coremytbl_FR__H', u'coremytbl_BE__H', > u'coremytbl_NL__H', u'coremytbl_DE__H', u'coremytbl_DAGNAME__H']) > wait_for_downstream False > weight_rule downstream > > DAG code: > {noformat} > > import datetime as dt > import glob > import json > import logging > import os > import subprocess > #import urllib > from airflow import DAG > from airflow.contrib.operators.spark_submit_operator import > SparkSubmitOperator > from airflow.operators.python_operator import PythonOperator > from airflow.hooks.base_hook import BaseHook > # Globals > DATALAKE_S3ROOT = subprocess.check_output(". /etc/pipeline/profile; dig -t > TXT +short qsrs3.$pipelineBranch.sss.$networkDomainName | tail -1 | tr -d > '\"' | sed -e s#^s3://##", shell=True).strip() > default_args = { > 'owner': 'airflow', > 'start_date': dt.datetime(2017, 6, 1), > 'retries': 0, > 'retry_delay': dt.timedelta(minutes=5), > } > def repair_hive_table(hive_cli_conn_id, schema, table, > drop_partitions_first=False): > > conn = BaseHook.get_connection(hive_cli_conn_id) > ssl_options = conn.extra_dejson.get('ssl-options') > jdbc_url = > "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema};{ssl_options}".format(**locals()) > > sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table) > hive_command = ' '.join([ > '/home/myuser/spark_home/bin/beeline', > '-u', '"%s"' % jdbc_url, > '-n', conn.login, > '-w', '/home/myuser/spark_home/hivepw', > '-e', '"%s"' % sqlstmt ]) > # note - do not log the command which contains truststore and hive user > passwords > logging.info("Executing following SQL statement: %s" % sqlstmt) > > process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, > stderr=subprocess.STDOUT, shell=True) > (output, error) = process.communicate() > logging.info(output) > if process.returncode != 0: > logging.error('Hive process returned code %d' % process.returncode) > raise Exception('Hive process returned code %d' % process.returncode) > > > > def create_spark_submit_operator(dag, pool, runstream, iz_strip_name, table, > filename): > task_id = "%s_%s" % (runstream, iz_strip_name) > #need to handle validation and import > filename = iz_strip_name + '_input.yaml' > file_path = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, > 'jar/input', filename) > return SparkSubmitOperator( > dag = dag, # need to tell airflow that this task belongs to the > dag we defined above > task_id = task_id, > pool = pool, > #params={"lob": lob}, # the sql file above have a template in it > for a 'lob' paramater - this is how we pass it in > #bash_command='echo "Data Load task 1 {{ params.lob }} here"' > conn_id='process', > java_class='com.cimp.jar.jar', > application='s3a://%s/jar.jar' % DATALAKE_S3ROOT, > #total_executor_cores='16', > executor_cores='2', > executor_memory='8g', > driver_memory='2g', > conf={"spark.sql.parquet.writeLegacyFormat" : "true" > ,"spark.executor.extraJavaOptions" : "-Dinput_wildcard={{ > ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' > }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if > dag_run.conf['hourstr'] else '' }}" > ,"spark.driver.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash > }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} > -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if > dag_run.conf['hourstr'] else '' }}" > }, > #num_executors='2', > name="%s_{{ ds_nodash }}" % task_id, > verbose=True, > #dont know how will work (where do the prior exports get made) or > if commandline way can be used > #env_vars={"input_wildcard": > os.environ['input_wildcard'],"datestr": os.environ['datestr'],"hourstr": > os.environ['hourstr']}, > application_args=[ > "--config", "%s" % file_path > ], > ) > def create_load_dag(dag_id, runstream): > dag = DAG(dag_id, # give the dag a name > schedule_interval=None, # define how often you want it to run - > you can pass cron expressions here > default_args=default_args # pass the default args defined above or > you can override them here if you want this dag to behave a little different > ) > > file_list = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, > 'file_list.json') > process = subprocess.Popen(["cat", file_list], stdout=subprocess.PIPE) > decoded_data = json.load(process.stdout) > > table_repair_dependencies = {} # (schema, table) -> [load tasks] > for file in decoded_data['files']: > srcfile = file['name'] > iz_strip_name = file['iz_strip_name'] > > schema = file['Schema'] > hive_table = file['hive_table'] > # One staging task > stagingTask = create_spark_submit_operator(dag, None, runstream, > iz_strip_name, srcfile, "QSRLOAD") > stagingTask.doc_md = """\ > QSR jar LOAD for %s to %s.%s > """ % (srcfile,schema,hive_table) > try : > table_repair_dependencies[(schema, > hive_table)].append(stagingTask) > except KeyError: > table_repair_dependencies[(schema, hive_table)] = [stagingTask] > > > # table repair tasks > for (schema, object_name) in table_repair_dependencies.keys() : > repairHiveTask = PythonOperator(dag = dag, > task_id="REPAIR_HIVE_%s.%s" % (schema,object_name), > python_callable=repair_hive_table, > op_kwargs={'hive_cli_conn_id' : 'query_hive', > 'schema' : schema, > 'table' : object_name}) > repairHiveTask << table_repair_dependencies[(schema, object_name)] > > return dag > > > # dynamically generate all dags > for entry in os.listdir('/home/myuser/jsonconfigs/si/job'): > file_list = glob.glob(os.path.join('/home/myuser/jsonconfigs/si/job', > entry, 'file_list.json')) > if file_list : > runstream = entry > dag_id = 'qsr_%s' % runstream > globals()[dag_id] = create_load_dag(dag_id, runstream) > {noformat} > > *after encountering this issue i ran*: airflow clear mycooldag > below was the output....as you can see the REPAIR_HIVE task was never > successful so I don't know how the dag can be overall 'success' state!!! > > > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-01 00:00:00+00:00 > [upstream_failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-02 00:00:00+00:00 > [upstream_failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 00:00:00+00:00 > [upstream_failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 05:00:00+00:00 > [upstream_failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 00:00:00+00:00 > [failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 17:00:00+00:00 > [failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 22:00:00+00:00 > [failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 00:00:00+00:00 > [upstream_failed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 07:00:00+00:00 > [removed]> > <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 > [removed]> -- This message was sent by Atlassian Jira (v8.3.4#803005)