[jira] [Created] (AIRFLOW-3542) next_ds semantics broken for manually triggered runs
Dan Davydov created AIRFLOW-3542: Summary: next_ds semantics broken for manually triggered runs Key: AIRFLOW-3542 URL: https://issues.apache.org/jira/browse/AIRFLOW-3542 Project: Apache Airflow Issue Type: Bug Components: scheduler Affects Versions: 1.10.2 Reporter: Dan Davydov Assignee: Dan Davydov {color:#22}next_ds{color}{color:#22} is useful when you need cron-style scheduling, e.g. a task that runs for date "X" uses that date for its logic, e.g. send an email to users saying the run that was supposed to run for date "X" has completed. The problem is it doesn't behave as expected when it comes to manually triggered runs as illustrated by the diagrams below.{color} Using execution_date in a task *Scheduled Run (works as expected)* execution_date1 start_date1 \/ \/ *|-|* /\ /\ \_/ scheduling_interval *Manual Run* *(works as expected)* triggered_date + execution_date + start_date \/ *|* Using next_ds in a Task *Scheduled Run (works as expected)* next_ds1 + start_date1 next_ds2 + start_date2 \/ \/ *||* /\ /\ \/ scheduling_interval *Manual Run* *(next_ds1 is expected to match triggered_date as in the case for the manually triggered run that uses the regular execution_date above)* triggered_date next_ds1 + start_date \/ \/ *|-|* /\ /\ \/ 0 to scheduling_interval (depending on when the next execution date is) Proposal Have next_ds always set to execution_date for manually triggered runs instead of the next schedule-interval aligned execution date. This _might_ break backwards compatibility for some users but it can be argued that the current functionality is a bug. If it's really desired we can create new aliases that behave logically although I am against this. prev_ds should probably also be made consistent with this logic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3233) Dag deletion in the UI doesn't work
[ https://issues.apache.org/jira/browse/AIRFLOW-3233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-3233: - Affects Version/s: 1.10.0 > Dag deletion in the UI doesn't work > --- > > Key: AIRFLOW-3233 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3233 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: 1.10.0 > Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > Dag deletion in the UI doesn't work, DAGs can only be deleted if the DAG > doesn't exist in the DAGBag, but if the DAG doesn't exist in the DAGBag the > deletion URL gets passed an empty DAG id. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3233) Dag deletion in the UI doesn't work
Dan Davydov created AIRFLOW-3233: Summary: Dag deletion in the UI doesn't work Key: AIRFLOW-3233 URL: https://issues.apache.org/jira/browse/AIRFLOW-3233 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov Assignee: Dan Davydov Dag deletion in the UI doesn't work, DAGs can only be deleted if the DAG doesn't exist in the DAGBag, but if the DAG doesn't exist in the DAGBag the deletion URL gets passed an empty DAG id. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3191) Fix not being able to specify execution_date when creating dagrun
[ https://issues.apache.org/jira/browse/AIRFLOW-3191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-3191: - Description: Creating a dagrun using the flask web UI is not possible because execution_date is hidden. This is because the column type is actually DATETIME where Flask only has a converter for DateTime (it's the same type, it's a casing issue). Fix is to add a form override: form_overrides = dict(execution_date=DateTimeField) Only applies to the non-rbac webserver. was: Creating a dagrun using the flask web UI is not possible because execution_date is hidden. This is because the column type is actually DATETIME where Flask only has a converter for DateTime (it's the same type, it's a casing issue). Fix is to add a form override: form_overrides = dict(execution_date=DateTimeField) > Fix not being able to specify execution_date when creating dagrun > - > > Key: AIRFLOW-3191 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3191 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: 1.10.0 > Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > Creating a dagrun using the flask web UI is not possible because > execution_date is hidden. This is because the column type is actually > DATETIME where Flask only has a converter for DateTime (it's the same type, > it's a casing issue). > Fix is to add a form override: form_overrides = > dict(execution_date=DateTimeField) > Only applies to the non-rbac webserver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3191) Can not specify execution_date when creating dagrun
Dan Davydov created AIRFLOW-3191: Summary: Can not specify execution_date when creating dagrun Key: AIRFLOW-3191 URL: https://issues.apache.org/jira/browse/AIRFLOW-3191 Project: Apache Airflow Issue Type: Bug Components: webserver Affects Versions: 1.10.0 Reporter: Dan Davydov Assignee: Dan Davydov Creating a dagrun using the flask web UI is not possible because execution_date is hidden. This is because the column type is actually DATETIME where Flask only has a converter for DateTime (it's the same type, it's a casing issue). Fix is to add a form override: form_overrides = dict(execution_date=DateTimeField) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3191) Fix not being able to specify execution_date when creating dagrun
[ https://issues.apache.org/jira/browse/AIRFLOW-3191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-3191: - Summary: Fix not being able to specify execution_date when creating dagrun (was: Can not specify execution_date when creating dagrun) > Fix not being able to specify execution_date when creating dagrun > - > > Key: AIRFLOW-3191 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3191 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: 1.10.0 > Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > Creating a dagrun using the flask web UI is not possible because > execution_date is hidden. This is because the column type is actually > DATETIME where Flask only has a converter for DateTime (it's the same type, > it's a casing issue). > Fix is to add a form override: form_overrides = > dict(execution_date=DateTimeField) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3160) Load latest_dagruns asynchronously
[ https://issues.apache.org/jira/browse/AIRFLOW-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639886#comment-16639886 ] Dan Davydov commented on AIRFLOW-3160: -- Latency is still an issue if the DB is not e.g. collocated (50 sequential DB calls), batching the queries at the least is definitely necessary. > Load latest_dagruns asynchronously > --- > > Key: AIRFLOW-3160 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3160 > Project: Apache Airflow > Issue Type: Improvement > Components: webserver >Affects Versions: 1.10.0 > Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > The front page loads very slowly when the DB has latency because one blocking > query is made per DAG against the DB. > > The latest dagruns should be loaded asynchronously and in batch like the > other UI elements that query the database. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3160) Load latest_dagruns asynchronously
Dan Davydov created AIRFLOW-3160: Summary: Load latest_dagruns asynchronously Key: AIRFLOW-3160 URL: https://issues.apache.org/jira/browse/AIRFLOW-3160 Project: Apache Airflow Issue Type: Improvement Components: webserver Affects Versions: 1.10.0 Reporter: Dan Davydov Assignee: Dan Davydov The front page loads very slowly when the DB has latency because one blocking query is made per DAG against the DB. The latest dagruns should be loaded asynchronously and in batch like the other UI elements that query the database. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3050) Deactivate Stale DAGs in the Main Scheduler Loop
Dan Davydov created AIRFLOW-3050: Summary: Deactivate Stale DAGs in the Main Scheduler Loop Key: AIRFLOW-3050 URL: https://issues.apache.org/jira/browse/AIRFLOW-3050 Project: Apache Airflow Issue Type: Improvement Components: scheduler Reporter: Dan Davydov Assignee: Dan Davydov Currently there is logic to deactivate stale DAGs in the scheduler, but it is only run after the scheduler completes. Since most companies run the scheduler in a continuous loop, this code never runs. This code should be moved into the main scheduler loop. This would largely obviate the UI/API endpoints for deleting DAGs. Mailing List Thread On This Topic: https://lists.apache.org/thread.html/8ac996a69bfa8214e5f56f2851cea349e0d83dcbb0d9e5af010525b9@%3Cdev.airflow.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2895) Prevent scheduler from spamming heartbeats/logs
Dan Davydov created AIRFLOW-2895: Summary: Prevent scheduler from spamming heartbeats/logs Key: AIRFLOW-2895 URL: https://issues.apache.org/jira/browse/AIRFLOW-2895 Project: Apache Airflow Issue Type: Bug Components: scheduler Reporter: Dan Davydov Assignee: Dan Davydov There seems to be a couple of problems with [https://github.com/apache/incubator-airflow/pull/2986] that cause the sleep to not trigger and Scheduler heartbeating/logs to be spammed: # If all of the files are being processed in the queue, there is no sleep (can be fixed by sleeping for min_sleep even if there are no files) # I have heard reports that some files can return a parsing time that is monotonically increasing for some reason (e.g. file actually parses in 1s each loop, but the reported duration seems to use the very time the file was parsed as the start time instead of the last time), I haven't confirmed this but it sounds problematic. To unblock the release I'm reverting this PR for now. It should be re-added with tests/mocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2810] Fix typo in Xcom model timestamp
Repository: incubator-airflow Updated Branches: refs/heads/master 233056e0d -> a338f3276 [AIRFLOW-2810] Fix typo in Xcom model timestamp Fix typo in Xcom model timestamp field No new testing - the field is already represented in migrations Closes #3652 from andywilco/fix_datetime_typo Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a338f327 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a338f327 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a338f327 Branch: refs/heads/master Commit: a338f3276835af45765d24a6e6d43ad4ba4d66ba Parents: 233056e Author: Andy Wilcox Authored: Fri Jul 27 13:03:30 2018 -0400 Committer: Dan Davydov Committed: Fri Jul 27 13:03:36 2018 -0400 -- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a338f327/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 1d832ab..cf7eb0a 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4482,7 +4482,7 @@ class XCom(Base, LoggingMixin): key = Column(String(512)) value = Column(LargeBinary) timestamp = Column( -DateTime, default=timezone.utcnow, nullable=False) +UtcDateTime, default=timezone.utcnow, nullable=False) execution_date = Column(UtcDateTime, nullable=False) # source information
[jira] [Created] (AIRFLOW-2422) Batch manage_slas queries
Dan Davydov created AIRFLOW-2422: Summary: Batch manage_slas queries Key: AIRFLOW-2422 URL: https://issues.apache.org/jira/browse/AIRFLOW-2422 Project: Apache Airflow Issue Type: Bug Components: scheduler Reporter: Dan Davydov Assignee: Dan Davydov If there are too many SLA entries that need to be inserted in the scheduler loop, the scheduler errors out if the connection times out. The SLA queries to Mysql need to be batched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [Airflow-2202] Add filter support in HiveMetastoreHook().max_partition()
Repository: incubator-airflow Updated Branches: refs/heads/master 133e249e0 -> 9a315efc7 [Airflow-2202] Add filter support in HiveMetastoreHook().max_partition() Adding back support for filter in max_partition() which could be used by some valid use cases. It will work for tables with multiple partitions, which is the behavior before (tho the doc stated it only works for single partitioned table). This change also kept the behavior when trying to get max partition on a sub-partitioned table without supplying filter--it will return the max partition value of the partition key even it is not unique. Some extra checks are added to provide more meaningful exception messages. Closes #3117 from yrqls21/kevin_yang_add_filter Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a315efc Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a315efc Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a315efc Branch: refs/heads/master Commit: 9a315efc797da3936ef0ee9065307b1e02cab43a Parents: 133e249 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Fri Mar 16 18:01:48 2018 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Mar 16 18:01:54 2018 -0700 -- airflow/hooks/hive_hooks.py | 109 - airflow/macros/hive.py| 21 --- tests/hooks/test_hive_hook.py | 57 +++ 3 files changed, 129 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a315efc/airflow/hooks/hive_hooks.py -- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 128be41..7b5ff10 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -614,62 +614,95 @@ class HiveMetastoreHook(BaseHook): return [dict(zip(pnames, p.values)) for p in parts] @staticmethod -def _get_max_partition_from_part_names(part_names, key_name): -""" -Helper method to get max partition from part names. Works only -when partition format follows '{key}={value}' and key_name is name of -the only partition key. -:param part_names: list of partition names -:type part_names: list -:param key_name: partition key name -:type key_name: str -:return: Max partition or None if part_names is empty. -""" -if not part_names: +def _get_max_partition_from_part_specs(part_specs, partition_key, filter_map): +""" +Helper method to get max partition of partitions with partition_key +from part specs. key:value pair in filter_map will be used to +filter out partitions. + +:param part_specs: list of partition specs. +:type part_specs: list +:param partition_key: partition key name. +:type partition_key: string +:param filter_map: partition_key:partition_value map used for partition filtering, + e.g. {'key1': 'value1', 'key2': 'value2'}. + Only partitions matching all partition_key:partition_value + pairs will be considered as candidates of max partition. +:type filter_map: map +:return: Max partition or None if part_specs is empty. +""" +if not part_specs: return None -prefix = key_name + '=' -prefix_len = len(key_name) + 1 -max_val = None -for part_name in part_names: -if part_name.startswith(prefix): -if max_val is None: -max_val = part_name[prefix_len:] -else: -max_val = max(max_val, part_name[prefix_len:]) -else: -raise AirflowException( -"Partition name mal-formatted: {}".format(part_name)) -return max_val +# Assuming all specs have the same keys. +if partition_key not in part_specs[0].keys(): +raise AirflowException("Provided partition_key {} " + "is not in part_specs.".format(partition_key)) -def max_partition(self, schema, table_name, field=None): +if filter_map and not set(filter_map.keys()) < set(part_specs[0].keys()): +raise AirflowException("Keys in provided filter_map {} " + "are not subset of part_spec keys: {}" + .format(', '.join(filter_map.keys()), + ', '.join(part_specs[0
incubator-airflow git commit: [AIRFLOW-2150] Use lighter call in HiveMetastoreHook().max_partition()
Repository: incubator-airflow Updated Branches: refs/heads/master 0f9f4605f -> b8c2cea36 [AIRFLOW-2150] Use lighter call in HiveMetastoreHook().max_partition() Call self.metastore.get_partition_names() instead of self.metastore.get_partitions(), which is extremely expensive for large tables, in HiveMetastoreHook().max_partition(). Closes #3082 from yrqls21/kevin_yang_fix_hive_max_partition Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b8c2cea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b8c2cea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b8c2cea3 Branch: refs/heads/master Commit: b8c2cea36299d6a3264d8bb1dc5a3995732b8855 Parents: 0f9f460 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Wed Mar 7 16:12:14 2018 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Mar 7 16:12:18 2018 -0800 -- airflow/hooks/hive_hooks.py | 64 ++ airflow/macros/hive.py| 2 +- tests/hooks/test_hive_hook.py | 39 +++ 3 files changed, 91 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8c2cea3/airflow/hooks/hive_hooks.py -- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index cd7319d..128be41 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -429,9 +429,11 @@ class HiveCliHook(BaseHook): class HiveMetastoreHook(BaseHook): - """ Wrapper to interact with the Hive Metastore""" +# java short max val +MAX_PART_COUNT = 32767 + def __init__(self, metastore_conn_id='metastore_default'): self.metastore_conn = self.get_connection(metastore_conn_id) self.metastore = self.get_metastore_client() @@ -601,16 +603,46 @@ class HiveMetastoreHook(BaseHook): if filter: parts = self.metastore.get_partitions_by_filter( db_name=schema, tbl_name=table_name, -filter=filter, max_parts=32767) +filter=filter, max_parts=HiveMetastoreHook.MAX_PART_COUNT) else: parts = self.metastore.get_partitions( -db_name=schema, tbl_name=table_name, max_parts=32767) +db_name=schema, tbl_name=table_name, +max_parts=HiveMetastoreHook.MAX_PART_COUNT) self.metastore._oprot.trans.close() pnames = [p.name for p in table.partitionKeys] return [dict(zip(pnames, p.values)) for p in parts] -def max_partition(self, schema, table_name, field=None, filter=None): +@staticmethod +def _get_max_partition_from_part_names(part_names, key_name): +""" +Helper method to get max partition from part names. Works only +when partition format follows '{key}={value}' and key_name is name of +the only partition key. +:param part_names: list of partition names +:type part_names: list +:param key_name: partition key name +:type key_name: str +:return: Max partition or None if part_names is empty. +""" +if not part_names: +return None + +prefix = key_name + '=' +prefix_len = len(key_name) + 1 +max_val = None +for part_name in part_names: +if part_name.startswith(prefix): +if max_val is None: +max_val = part_name[prefix_len:] +else: +max_val = max(max_val, part_name[prefix_len:]) +else: +raise AirflowException( +"Partition name mal-formatted: {}".format(part_name)) +return max_val + +def max_partition(self, schema, table_name, field=None): """ Returns the maximum value for all partitions in a table. Works only for tables that have a single partition key. For subpartitioned @@ -621,17 +653,23 @@ class HiveMetastoreHook(BaseHook): >>> hh.max_partition(schema='airflow', table_name=t) '2015-01-01' """ -parts = self.get_partitions(schema, table_name, filter) -if not parts: -return None -elif len(parts[0]) == 1: -field = list(parts[0].keys())[0] -elif not field: +self.metastore._oprot.trans.open() +table = self.metastore.get_table(dbname=schema, tbl_name=table_name) +if len(table.partitionKeys) != 1: raise AirflowException( -"Please speci
[jira] [Reopened] (AIRFLOW-226) Create separate pip packages for webserver and hooks
[ https://issues.apache.org/jira/browse/AIRFLOW-226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov reopened AIRFLOW-226: - > Create separate pip packages for webserver and hooks > > > Key: AIRFLOW-226 > URL: https://issues.apache.org/jira/browse/AIRFLOW-226 > Project: Apache Airflow > Issue Type: Improvement > Reporter: Dan Davydov >Priority: Minor > > There are users who want only the airflow hooks, and others who many not need > the front-end. The hooks and webserver should be moved into their own > packages, with the current airflow package depending on these packages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-226) Create separate pip packages for webserver and hooks
[ https://issues.apache.org/jira/browse/AIRFLOW-226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387016#comment-16387016 ] Dan Davydov commented on AIRFLOW-226: - I feel strongly (at least for hooks) that they should be moved out. Things like storing secrets in the Airflow database, hooks, etc. are convenient, but they are equivalent to plugins and should have their own owners and maintainers. It doesn't make sense to e.g. make the owner and expert of the HiveHook be a committer in this repo but they certainly should be the committer and maintainer of the HiveHook repo. Another point as to why it makes sense to decouple hooks and the core is that it doesn't scale to support backwards incompatible changes to all operators for the Airflow committers, we are effectively supporting many hooks which we have no domain knowledge of. Other systems such as Jenkins follow a similar plugin framework. > Create separate pip packages for webserver and hooks > > > Key: AIRFLOW-226 > URL: https://issues.apache.org/jira/browse/AIRFLOW-226 > Project: Apache Airflow > Issue Type: Improvement > Reporter: Dan Davydov >Priority: Minor > > There are users who want only the airflow hooks, and others who many not need > the front-end. The hooks and webserver should be moved into their own > packages, with the current airflow package depending on these packages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2173) Don't check task IDs for concurrency reached check
Dan Davydov created AIRFLOW-2173: Summary: Don't check task IDs for concurrency reached check Key: AIRFLOW-2173 URL: https://issues.apache.org/jira/browse/AIRFLOW-2173 Project: Apache Airflow Issue Type: Bug Components: core Reporter: Dan Davydov Assignee: Dan Davydov Currently the concurrency reached check does a filter in the DB query to only include tasks that are currently in the parsed DAG. For sufficiently large DAGs with many tasks, this causes mysql to use an inefficient query plan and can put a lot of load on the database. Since there is no good reason to omit old running tasks in a DAG from concurrency, this filter should just be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2156) Parallelize Celery Executor
Dan Davydov created AIRFLOW-2156: Summary: Parallelize Celery Executor Key: AIRFLOW-2156 URL: https://issues.apache.org/jira/browse/AIRFLOW-2156 Project: Apache Airflow Issue Type: Improvement Components: celery Reporter: Dan Davydov Assignee: Dan Davydov The CeleryExecutor doesn't currently support parallel execution to check task states since Celery does not support this. This can greatly slow down the Scheduler loops since each request to check a task's state is a network request. The Celery Executor should parallelize these requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-908) Airflow run should print worker name at top of log
[ https://issues.apache.org/jira/browse/AIRFLOW-908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov resolved AIRFLOW-908. - Resolution: Fixed > Airflow run should print worker name at top of log > -- > > Key: AIRFLOW-908 > URL: https://issues.apache.org/jira/browse/AIRFLOW-908 > Project: Apache Airflow > Issue Type: New Feature > Reporter: Dan Davydov > Assignee: Dan Davydov >Priority: Major > Labels: beginner, starter > > Airflow run should log the worker hostname at top of log. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-1985] Impersonation fixes for using `run_as_user`
Repository: incubator-airflow Updated Branches: refs/heads/master 6f00f722b -> a0deb506c [AIRFLOW-1985] Impersonation fixes for using `run_as_user` Changes to propagate config in subdags, and make sure tasks running have the config. Making sure all tasks have the proper configuration by copying it into a temporary file. BashOperators should have the context of the AIRFLOW_HOME and PYTHONPATH as other tasks have. Closes #2991 from edgarRd/erod-impersonation-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a0deb506 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a0deb506 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a0deb506 Branch: refs/heads/master Commit: a0deb506c070637abc3c426bc7d060e3fe6c854d Parents: 6f00f72 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Wed Feb 7 14:33:36 2018 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Feb 7 14:33:40 2018 -0800 -- airflow/executors/base_executor.py | 11 - airflow/jobs.py | 10 +++- airflow/models.py| 2 + airflow/operators/bash_operator.py | 24 +++-- airflow/task/task_runner/base_task_runner.py | 32 +--- airflow/utils/configuration.py | 45 + airflow/utils/log/file_task_handler.py | 2 +- tests/dags/test_impersonation_subdag.py | 59 +++ tests/impersonation.py | 10 9 files changed, 167 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a0deb506/airflow/executors/base_executor.py -- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index d3d0675..d606057 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -58,8 +58,14 @@ class BaseExecutor(LoggingMixin): ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=None): +pool=None, +cfg_path=None): pool = pool or task_instance.pool + +# TODO (edgarRd): AIRFLOW-1985: +# cfg_path is needed to propagate the config values if using impersonation +# (run_as_user), given that there are different code paths running tasks. +# For a long term solution we need to address AIRFLOW-1986 command = task_instance.command( local=True, mark_success=mark_success, @@ -68,7 +74,8 @@ class BaseExecutor(LoggingMixin): ignore_task_deps=ignore_task_deps, ignore_ti_state=ignore_ti_state, pool=pool, -pickle_id=pickle_id) +pickle_id=pickle_id, +cfg_path=cfg_path) self.queue_command( task_instance, command, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a0deb506/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index ccb22cb..172792d 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -58,6 +58,7 @@ from airflow.utils.db import create_session, provide_session from airflow.utils.email import send_email from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter from airflow.utils.state import State +from airflow.utils.configuration import tmp_configuration_copy Base = models.Base ID_LEN = models.ID_LEN @@ -2234,13 +2235,20 @@ class BackfillJob(BaseJob): # Skip scheduled state, we are executing immediately ti.state = State.QUEUED session.merge(ti) + +cfg_path = None +if executor.__class__ in (executors.LocalExecutor, + executors.SequentialExecutor): +cfg_path = tmp_configuration_copy() + executor.queue_task_instance( ti, mark_success=self.mark_success, pickle_id=pickle_id, ignore_task_deps=self.ignore_task_deps, ignore_depends_on_past=ignore_depends_on_past, -pool=self.pool) +pool=self.pool, +cfg_path=cfg_path) ti
[jira] [Created] (AIRFLOW-2061) Remove expensive redundant async.state calls in celery executor
Dan Davydov created AIRFLOW-2061: Summary: Remove expensive redundant async.state calls in celery executor Key: AIRFLOW-2061 URL: https://issues.apache.org/jira/browse/AIRFLOW-2061 Project: Apache Airflow Issue Type: Improvement Components: executor Reporter: Dan Davydov Assignee: Dan Davydov The Celery Executor makes the expense calls async.state multiple times, when it could reuse the initial async.state call. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed
[ https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-2027: - Description: The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this sleep to slightly speed up scheduling, and instead do it once all files have been parsed. It can add up since it runs to every scheduler loop which runs # of dags to parse/scheduler parallelism times. Also remove the unnecessary increased file processing interval in tests which slows them down. was: The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this sleep to slightly speed up scheduling. It can add up since it runs to every scheduler loop which runs # of dags to parse/scheduler parallelism times. Also remove the unnecessary increased file processing interval in tests which slows them down. > Only trigger sleep in scheduler after all files have parsed > --- > > Key: AIRFLOW-2027 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler > Reporter: Dan Davydov > Assignee: Dan Davydov >Priority: Major > > The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this > sleep to slightly speed up scheduling, and instead do it once all files have > been parsed. It can add up since it runs to every scheduler loop which runs # > of dags to parse/scheduler parallelism times. > Also remove the unnecessary increased file processing interval in tests which > slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2023] Add debug logging around number of queued files
Repository: incubator-airflow Updated Branches: refs/heads/master da0e628fa -> 61ff29e57 [AIRFLOW-2023] Add debug logging around number of queued files Add debug logging around number of queued files to process in the scheduler. This makes it easy to see when there are bottlenecks due to parallelism and how long it takes for all files to be processed. Closes #2968 from aoen/ddavydov-- add_more_scheduler_metrics Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61ff29e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61ff29e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61ff29e5 Branch: refs/heads/master Commit: 61ff29e578d1121ab4606fe122fb4e2db8f075b9 Parents: da0e628 Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Mon Jan 29 12:01:58 2018 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Jan 29 12:02:02 2018 -0800 -- airflow/utils/dag_processing.py | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61ff29e5/airflow/utils/dag_processing.py -- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index dc0c7a6..b4abd34 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -476,6 +476,12 @@ class DagFileProcessorManager(LoggingMixin): running_processors[file_path] = processor self._processors = running_processors +self.log.debug("%s/%s scheduler processes running", + len(self._processors), self._parallelism) + +self.log.debug("%s file paths queued for processing", + len(self._file_path_queue)) + # Collect all the DAGs that were found in the processed files simple_dags = [] for file_path, processor in finished_processors.items():
[jira] [Updated] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed
[ https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-2027: - Summary: Only trigger sleep in scheduler after all files have parsed (was: Remove unnecessary 1s sleep in scheduler loop) > Only trigger sleep in scheduler after all files have parsed > --- > > Key: AIRFLOW-2027 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler > Reporter: Dan Davydov > Assignee: Dan Davydov >Priority: Major > > The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this > sleep to slightly speed up scheduling. It can add up since it runs to every > scheduler loop which runs # of dags to parse/scheduler parallelism times. > > Also remove the unnecessary increased file processing interval in tests which > slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2027) Remove unnecessary 1s sleep in scheduler loop
[ https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341861#comment-16341861 ] Dan Davydov commented on AIRFLOW-2027: -- the PR I'm about to cut will sleep 1s - time it took to parse dags (so still 1s minimum). Not greatly concerned about the busy looping really, but the log spam is an issue. We have found in high load situations, 10% of the delay was caused by this sleep. > Remove unnecessary 1s sleep in scheduler loop > - > > Key: AIRFLOW-2027 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler > Reporter: Dan Davydov > Assignee: Dan Davydov >Priority: Major > > The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this > sleep to slightly speed up scheduling. It can add up since it runs to every > scheduler loop which runs # of dags to parse/scheduler parallelism times. > > Also remove the unnecessary increased file processing interval in tests which > slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2027) Remove unnecessary 1s sleep in scheduler loop
Dan Davydov created AIRFLOW-2027: Summary: Remove unnecessary 1s sleep in scheduler loop Key: AIRFLOW-2027 URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 Project: Apache Airflow Issue Type: Improvement Components: scheduler Reporter: Dan Davydov Assignee: Dan Davydov The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this sleep to slightly speed up scheduling. It can add up since it runs to every scheduler loop which runs # of dags to parse/scheduler parallelism times. Also remove the unnecessary increased file processing interval in tests which slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-1985) Subdag tasks do not work with impersonation
[ https://issues.apache.org/jira/browse/AIRFLOW-1985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1985: - Description: When using {{run_as_user}} for impersonation, airflow creates a subset of the configuration to make it available for the task during execution via {{base_task_runner.py}}. This behavior is consistent for the triggering the subdag task. The above code path is not executed when running tasks within a subdag using the {{SequentialExecutor}}, where each task is run direcly. Note that in the context of subdags, tasks running for the subdags are already running in the same context of the user, so no additional impersonation is needed, but since it's not guaranteed that the user has the right configuration settings (hence why we copy a subset of the configuration during impersonation), we need to propagate those settings for the tasks within the subdag as well. This change also requires exporting AIRFLOW_HOME and PYTHONPATH env variables in bash operator so that run_as_user can work if airflow operators are called from a bash operator (e.g. if a bash operator calls python code that imports airflow but the airflow user isn't the same as the run_as_user). was: When using {{run_as_user}} for impersonation, airflow creates a subset of the configuration to make it available for the task during execution via {{base_task_runner.py}}. This behavior is consistent for the triggering the subdag task. The above code path is not executed when running tasks within a subdag using the {{SequentialExecutor}}, where each task is run direcly. Note that in the context of subdags, tasks running for the subdags are already running in the same context of the user, so no additional impersonation is needed, but since it's not guaranteed that the user has the right configuration settings (hence why we copy a subset of the configuration during impersonation), we need to propagate those settings for the tasks within the subdag as well. > Subdag tasks do not work with impersonation > --- > > Key: AIRFLOW-1985 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1985 > Project: Apache Airflow > Issue Type: Bug >Reporter: Edgar Rodriguez >Assignee: Edgar Rodriguez >Priority: Major > > When using {{run_as_user}} for impersonation, airflow creates a subset of the > configuration to make it available for the task during execution via > {{base_task_runner.py}}. This behavior is consistent for the triggering the > subdag task. > The above code path is not executed when running tasks within a subdag using > the {{SequentialExecutor}}, where each task is run direcly. > Note that in the context of subdags, tasks running for the subdags are > already running in the same context of the user, so no additional > impersonation is needed, but since it's not guaranteed that the user has the > right configuration settings (hence why we copy a subset of the configuration > during impersonation), we need to propagate those settings for the tasks > within the subdag as well. > This change also requires exporting AIRFLOW_HOME and PYTHONPATH env variables > in bash operator so that run_as_user can work if airflow operators are called > from a bash operator (e.g. if a bash operator calls python code that imports > airflow but the airflow user isn't the same as the run_as_user). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-1980) Scheduler DB Queries Should be Batched
[ https://issues.apache.org/jira/browse/AIRFLOW-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1980: - Description: DB queries in the scheduler should be batched, since the larger ones can get quite big and cause lock wait timeouts. One example is the queries in the _change_state_for_tis_without_dagrun function in jobs.py The orphan task cleanup is another such piece of code. was: DB queries in the scheduler should be batched, since the larger ones can get quite big and cause lock wait timeouts. One example is the queries in the _change_state_for_tis_without_dagrun function in jobs.py > Scheduler DB Queries Should be Batched > -- > > Key: AIRFLOW-1980 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1980 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Reporter: Dan Davydov > > DB queries in the scheduler should be batched, since the larger ones can get > quite big and cause lock wait timeouts. > One example is the queries in the _change_state_for_tis_without_dagrun > function in jobs.py > The orphan task cleanup is another such piece of code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (AIRFLOW-1980) Scheduler DB Queries Should be Batched
[ https://issues.apache.org/jira/browse/AIRFLOW-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1980: - Description: DB queries in the scheduler should be batched, since the larger ones can get quite big and cause lock wait timeouts. One example is the queries in the _change_state_for_tis_without_dagrun function in jobs.py The orphan task cleanup is another such piece of code that should have the queries done in batch. was: DB queries in the scheduler should be batched, since the larger ones can get quite big and cause lock wait timeouts. One example is the queries in the _change_state_for_tis_without_dagrun function in jobs.py The orphan task cleanup is another such piece of code. > Scheduler DB Queries Should be Batched > -- > > Key: AIRFLOW-1980 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1980 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Reporter: Dan Davydov > > DB queries in the scheduler should be batched, since the larger ones can get > quite big and cause lock wait timeouts. > One example is the queries in the _change_state_for_tis_without_dagrun > function in jobs.py > The orphan task cleanup is another such piece of code that should have the > queries done in batch. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1980) Scheduler DB Queries Should be Batched
Dan Davydov created AIRFLOW-1980: Summary: Scheduler DB Queries Should be Batched Key: AIRFLOW-1980 URL: https://issues.apache.org/jira/browse/AIRFLOW-1980 Project: Apache Airflow Issue Type: Bug Components: scheduler Reporter: Dan Davydov DB queries in the scheduler should be batched, since the larger ones can get quite big and cause lock wait timeouts. One example is the queries in the _change_state_for_tis_without_dagrun function in jobs.py -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1971] Propagate hive config on impersonation
Repository: incubator-airflow Updated Branches: refs/heads/master e46cde418 -> e9c1ac588 [AIRFLOW-1971] Propagate hive config on impersonation Currently, if hive specific settings are defined in the configuration file, they are not being propagated when using impersonation. We need to propagate this configuration down to the impersonated process. Closes #2920 from edgarRd/erod-propagate-hive-conf Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9c1ac58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9c1ac58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9c1ac58 Branch: refs/heads/master Commit: e9c1ac588a698b88f916d6f47531d7e0dc63237d Parents: e46cde4 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Mon Jan 8 11:25:12 2018 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Jan 8 11:25:16 2018 -0800 -- airflow/task/task_runner/base_task_runner.py | 1 + tests/dags/test_impersonation_custom.py | 10 ++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9c1ac58/airflow/task/task_runner/base_task_runner.py -- diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index 664a873..11a4745 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -64,6 +64,7 @@ class BaseTaskRunner(LoggingMixin): 'smtp': cfg_dict.get('smtp', {}), 'scheduler': cfg_dict.get('scheduler', {}), 'webserver': cfg_dict.get('webserver', {}), +'hive': cfg_dict.get('hive', {}), # we should probably generalized this } temp_fd, cfg_path = mkstemp() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9c1ac58/tests/dags/test_impersonation_custom.py -- diff --git a/tests/dags/test_impersonation_custom.py b/tests/dags/test_impersonation_custom.py index 6f35b38..b6dd9c8 100644 --- a/tests/dags/test_impersonation_custom.py +++ b/tests/dags/test_impersonation_custom.py @@ -41,7 +41,17 @@ def print_today(): print('Today is {}'.format(dt.strftime('%Y-%m-%d'))) +def check_hive_conf(): +from airflow import configuration as conf +assert conf.get('hive', 'default_hive_mapred_queue') == 'airflow' + + PythonOperator( python_callable=print_today, task_id='exec_python_fn', dag=dag) + +PythonOperator( +python_callable=check_hive_conf, +task_id='exec_check_hive_conf_fn', +dag=dag)
incubator-airflow git commit: [AIRFLOW-1963] Add config for HiveOperator mapred_queue
Repository: incubator-airflow Updated Branches: refs/heads/master 07c2a515e -> b3489b99e [AIRFLOW-1963] Add config for HiveOperator mapred_queue Adding configuration setting for specifying a default mapred_queue for hive jobs using the HiveOperator. Closes #2915 from edgarRd/erod-hive-mapred-queue- config Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3489b99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3489b99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3489b99 Branch: refs/heads/master Commit: b3489b99e9140e25bdd08b78f57ba845c3edb358 Parents: 07c2a51 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Wed Jan 3 14:23:09 2018 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Jan 3 14:23:14 2018 -0800 -- airflow/config_templates/default_airflow.cfg | 4 +- airflow/config_templates/default_test.cfg| 3 + airflow/hooks/hive_hooks.py | 4 +- airflow/operators/hive_operator.py | 19 -- scripts/ci/airflow_travis.cfg| 3 + tests/operators/hive_operator.py | 82 ++- 6 files changed, 77 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_airflow.cfg -- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a7a3b7d..d0dfb72 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -170,6 +170,9 @@ default_ram = 512 default_disk = 512 default_gpus = 0 +[hive] +# Default mapreduce queue for HiveOperator tasks +default_hive_mapred_queue = [webserver] # The base url of your website as airflow cannot guess what domain or @@ -458,7 +461,6 @@ keytab = airflow.keytab [github_enterprise] api_rev = v3 - [admin] # UI to hide sensitive variable fields when set to True hide_sensitive_variable_fields = True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_test.cfg -- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 85343ee..eaf3d03 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -51,6 +51,9 @@ auth_backend = airflow.api.auth.backend.default [operators] default_owner = airflow +[hive] +default_hive_mapred_queue = airflow + [webserver] base_url = http://localhost:8080 web_server_host = 0.0.0.0 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/hooks/hive_hooks.py -- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index eb39469..3d986fd 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -25,6 +25,7 @@ import time from tempfile import NamedTemporaryFile import hive_metastore +from airflow import configuration as conf from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.utils.helpers import as_flattened_list @@ -82,7 +83,8 @@ class HiveCliHook(BaseHook): "Invalid Mapred Queue Priority. Valid values are: " "{}".format(', '.join(HIVE_QUEUE_PRIORITIES))) -self.mapred_queue = mapred_queue +self.mapred_queue = mapred_queue or conf.get('hive', + 'default_hive_mapred_queue') self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/operators/hive_operator.py -- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 221feeb..ce98544 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -77,13 +77,19 @@ class HiveOperator(BaseOperator): self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name +# assigned lazily - just for consistency we can create the attribute with a +# `None` initial value, later it will be populated by the execute method. +# This also makes `on_kill` implementation consistent since it assumes `self.hook` +# is defined. +self.hook = None + def get_hook
[jira] [Commented] (AIRFLOW-511) DagRun Failure: Retry, Email, Callbacks
[ https://issues.apache.org/jira/browse/AIRFLOW-511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290485#comment-16290485 ] Dan Davydov commented on AIRFLOW-511: - You might be able to accomplish this for now using the task sla feature on a task that depends on all of the other tasks in the DAG > DagRun Failure: Retry, Email, Callbacks > --- > > Key: AIRFLOW-511 > URL: https://issues.apache.org/jira/browse/AIRFLOW-511 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Rob Froetscher >Priority: Minor > > Are there any plans to have retry, email, or callbacks on failure of a DAG > run? Would you guys be open to someone implementing that? Right now > particularly with dagrun_timeout, there is not much insight that the dag > actually stopped. > Pseudocode: > https://github.com/apache/incubator-airflow/compare/master...rfroetscher:dagrun_failure -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1819] Fix slack operator unittest bug
Repository: incubator-airflow Updated Branches: refs/heads/master d8e8f9014 -> 3c8f7747b [AIRFLOW-1819] Fix slack operator unittest bug Fix failing slack operator unittest and add test coverage. Closes #2791 from yrqls21/kevin-yang-fix-unit-test Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c8f7747 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c8f7747 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c8f7747 Branch: refs/heads/master Commit: 3c8f7747b08ca5c23233799e56a040f60e3d0fc6 Parents: d8e8f90 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Wed Nov 15 17:52:06 2017 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Nov 15 17:52:09 2017 -0800 -- tests/operators/slack_operator.py | 16 +--- 1 file changed, 13 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c8f7747/tests/operators/slack_operator.py -- diff --git a/tests/operators/slack_operator.py b/tests/operators/slack_operator.py index 5e40648..4d6b553 100644 --- a/tests/operators/slack_operator.py +++ b/tests/operators/slack_operator.py @@ -58,6 +58,7 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase): } ] self.test_attachments_in_json = json.dumps(self.test_attachments) +self.test_api_params = {'key': 'value'} self.test_kwarg = 'test_kwarg' self.expected_method = 'chat.postMessage' @@ -69,7 +70,7 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase): 'attachments': self.test_attachments_in_json, } -def __construct_operator(self, test_token, test_slack_conn_id): +def __construct_operator(self, test_token, test_slack_conn_id, test_api_params=None): return SlackAPIPostOperator( task_id='slack', username=self.test_username, @@ -79,6 +80,7 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase): text=self.test_text, icon_url=self.test_icon_url, attachments=self.test_attachments, +api_params=test_api_params, kwarg=self.test_kwarg ) @@ -96,6 +98,14 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase): slack_hook_mock.call.assert_called_with(self.expected_method, self.expected_api_params) +slack_api_post_operator = self.__construct_operator(test_token, None, self.test_api_params) + +slack_api_post_operator.execute() + +slack_hook_class_mock.assert_called_with(token=test_token, slack_conn_id=None) + +slack_hook_mock.call.assert_called_with(self.expected_method, self.test_api_params) + @mock.patch('airflow.operators.slack_operator.SlackHook') def test_execute_with_slack_conn_id_only(self, slack_hook_class_mock): slack_hook_mock = mock.Mock() @@ -121,13 +131,13 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase): test_token = 'test_token' test_slack_conn_id = 'test_slack_conn_id' -slack_api_post_operator = self.__construct_operator(test_token, None) +slack_api_post_operator = self.__construct_operator(test_token, None, self.test_api_params) self.assertEqual(slack_api_post_operator.token, test_token) self.assertEqual(slack_api_post_operator.slack_conn_id, None) self.assertEqual(slack_api_post_operator.method, self.expected_method) self.assertEqual(slack_api_post_operator.text, self.test_text) self.assertEqual(slack_api_post_operator.channel, self.test_channel) -self.assertEqual(slack_api_post_operator.api_params, self.expected_api_params) +self.assertEqual(slack_api_post_operator.api_params, self.test_api_params) self.assertEqual(slack_api_post_operator.username, self.test_username) self.assertEqual(slack_api_post_operator.icon_url, self.test_icon_url) self.assertEqual(slack_api_post_operator.attachments, self.test_attachments)
incubator-airflow git commit: [AIRFLOW-1805] Allow Slack token to be passed through connection
Repository: incubator-airflow Updated Branches: refs/heads/master d04519e60 -> d8e8f9014 [AIRFLOW-1805] Allow Slack token to be passed through connection Allow users to pass in Slack token through connection which can provide better security. This enables user to expose token only to workers instead to both workers and schedulers. Closes #2789 from yrqls21/add_conn_supp_in_slack_op Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8e8f901 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8e8f901 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8e8f901 Branch: refs/heads/master Commit: d8e8f90142246ae5b02c1a0f9649ea5a419a5afc Parents: d04519e Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Wed Nov 15 14:53:56 2017 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Nov 15 14:53:58 2017 -0800 -- airflow/hooks/__init__.py | 1 + airflow/hooks/slack_hook.py | 56 airflow/operators/slack_operator.py | 25 -- tests/hooks/test_slack_hook.py | 101 ++ tests/operators/slack_operator.py | 141 +++ 5 files changed, 315 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/hooks/__init__.py -- diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 6e96e2a..6372b2f 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -55,6 +55,7 @@ _hooks = { 'dbapi_hook': ['DbApiHook'], 'mssql_hook': ['MsSqlHook'], 'oracle_hook': ['OracleHook'], +'slack_hook': ['SlackHook'], } import os as _os http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/hooks/slack_hook.py -- diff --git a/airflow/hooks/slack_hook.py b/airflow/hooks/slack_hook.py new file mode 100644 index 000..cd47573 --- /dev/null +++ b/airflow/hooks/slack_hook.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from slackclient import SlackClient +from airflow.hooks.base_hook import BaseHook +from airflow.exceptions import AirflowException + + +class SlackHook(BaseHook): +""" + Interact with Slack, using slackclient library. +""" + +def __init__(self, token=None, slack_conn_id=None): +""" +Takes both Slack API token directly and connection that has Slack API token. + +If both supplied, Slack API token will be used. + +:param token: Slack API token +:type token: string +:param slack_conn_id: connection that has Slack API token in the password field +:type slack_conn_id: string +""" +self.token = self.__get_token(token, slack_conn_id) + +def __get_token(self, token, slack_conn_id): +if token is not None: +return token +elif slack_conn_id is not None: +conn = self.get_connection(slack_conn_id) + +if not getattr(conn, 'password', None): +raise AirflowException('Missing token(password) in Slack connection') +return conn.password +else: +raise AirflowException('Cannot get token: No valid Slack token nor slack_conn_id supplied.') + +def call(self, method, api_params): +sc = SlackClient(self.token) +rc = sc.api_call(method, **api_params) + +if not rc['ok']: +msg = "Slack API call failed (%s)".format(rc['error']) +raise AirflowException(msg) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/operators/slack_operator.py -- diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py index 8b21211..8398a7a 100644 --- a/airflow/operators/slack_operator.py +++ b/airflow/operators/slack_operator.py @@ -14,9 +14,9 @@ import json -from slackclient import SlackClient from airflow.models import BaseOper
incubator-airflow git commit: [AIRFLOW-1787] Fix task instance batch clear and set state bugs
Repository: incubator-airflow Updated Branches: refs/heads/master 1a7b63eb1 -> 313f5bac4 [AIRFLOW-1787] Fix task instance batch clear and set state bugs Fixes Batch clear in Task Instances view is not working for task instances in RUNNING state and all batch operations in Task instances view cannot work when manually triggered task instances are selected because they have a different execution date format. Closes #2759 from yrqls21/fix-ti-batch-clear-n -set-state-bugs Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/313f5bac Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/313f5bac Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/313f5bac Branch: refs/heads/master Commit: 313f5bac4a3f804094bcd583e0e5fbc3b5f405bb Parents: 1a7b63e Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Tue Nov 7 11:22:58 2017 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Nov 7 11:23:00 2017 -0800 -- airflow/utils/dates.py| 14 ++ airflow/www/views.py | 30 -- tests/utils/test_dates.py | 9 + 3 files changed, 43 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/airflow/utils/dates.py -- diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 43b87f4..81e1c2c 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -225,3 +225,17 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0): second=second, microsecond=microsecond) return today - timedelta(days=n) + + +def parse_execution_date(execution_date_str): +""" +Parse execution date string to datetime object. +""" +try: +# Execution date follows execution date format of scheduled executions, +# e.g. '2017-11-02 00:00:00' +return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S') +except ValueError: +# Execution date follows execution date format of manually triggered executions, +# e.g. '2017-11-05 16:18:30..989729' +return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S..%f') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index 81c44b6..a6d788e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -76,7 +76,7 @@ from airflow.utils.json import json_ser from airflow.utils.state import State from airflow.utils.db import create_session, provide_session from airflow.utils.helpers import alchemy_to_dict -from airflow.utils.dates import infer_time_unit, scale_time_units +from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date from airflow.www import utils as wwwutils from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm from airflow.www.validators import GreaterEqualThan @@ -2502,23 +2502,32 @@ class TaskInstanceModelView(ModelViewOnly): try: TI = models.TaskInstance +dag_to_task_details = {} dag_to_tis = {} -for id in ids: -task_id, dag_id, execution_date = id.split(',') +# Collect dags upfront as dagbag.get_dag() will reset the session +for id_str in ids: +task_id, dag_id, execution_date = id_str.split(',') +dag = dagbag.get_dag(dag_id) +task_details = dag_to_task_details.setdefault(dag, []) +task_details.append((task_id, execution_date)) -ti = session.query(TI).filter(TI.task_id == task_id, - TI.dag_id == dag_id, - TI.execution_date == execution_date).one() +for dag, task_details in dag_to_task_details.items(): +for task_id, execution_date in task_details: +execution_date = parse_execution_date(execution_date) -dag = dagbag.get_dag(dag_id) -tis = dag_to_tis.setdefault(dag, []) -tis.append(ti) +ti = session.query(TI).filter(TI.task_id == task_id, + TI.dag_id == dag.dag_id, + TI.execution_date == execution_date).one() + +tis = dag_to_tis.setdefault(dag, []) +tis.append(ti) for dag, tis in dag_to_tis.items(): mode
incubator-airflow git commit: [AIRFLOW-1780] Fix long output lines with unicode from hanging parent
Repository: incubator-airflow Updated Branches: refs/heads/master 3fde1043f -> 1a7b63eb1 [AIRFLOW-1780] Fix long output lines with unicode from hanging parent Fix long task output lines with unicode from hanging parent process. Tasks that create output that gets piped into a file in the parent airflow process would hang if they had long lines with unicode characters. Closes #2758 from aoen/ddavydov-- fix_unicode_output_string Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1a7b63eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1a7b63eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1a7b63eb Branch: refs/heads/master Commit: 1a7b63eb16ffa1cb97cb09f71997dcd39f28e645 Parents: 3fde104 Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Mon Nov 6 16:01:59 2017 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Nov 6 16:02:02 2017 -0800 -- airflow/task_runner/base_task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1a7b63eb/airflow/task_runner/base_task_runner.py -- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index 6a07db2..f4b4f2d 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin): line = line.decode('utf-8') if len(line) == 0: break -self.log.info('Subtask: %s', line.rstrip('\n')) +self.log.info(u'Subtask: %s', line.rstrip('\n')) def run_command(self, run_with, join_args=False): """
[jira] [Commented] (AIRFLOW-1143) Tasks rejected by workers get stuck in QUEUED
[ https://issues.apache.org/jira/browse/AIRFLOW-1143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241002#comment-16241002 ] Dan Davydov commented on AIRFLOW-1143: -- It may fix it, I'm not sure we were ever sure of the specific root cause. I think to have full confidence we should add a test that checks for this. > Tasks rejected by workers get stuck in QUEUED > - > > Key: AIRFLOW-1143 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1143 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Reporter: Dan Davydov >Assignee: Gerard Toonstra > > If the scheduler schedules a task that is sent to a worker that then rejects > the task (e.g. because one of the dependencies of the tasks became bad, like > the pool became full), the task will be stuck in the QUEUED state. We hit > this trying to switch from invoking the scheduler "airflow scheduler -n 5" to > just "airflow scheduler". > Restarting the scheduler fixes this because it cleans up orphans, but we > shouldn't have to restart the scheduler to fix these problems (the missing > job heartbeats should make the scheduler requeue the task). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (AIRFLOW-1780) Long Unicode Characters In BashOperator Output Cause Task Hang
[ https://issues.apache.org/jira/browse/AIRFLOW-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1780: - Summary: Long Unicode Characters In BashOperator Output Cause Task Hang (was: Long Unicode Characters Cause Logging) > Long Unicode Characters In BashOperator Output Cause Task Hang > -- > > Key: AIRFLOW-1780 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1780 > Project: Apache Airflow > Issue Type: Bug > Reporter: Dan Davydov > Assignee: Dan Davydov >Priority: Major > > Conditions to replicate: > Create a DAG with a single BashOperator that logs ~10 long lines of text with > at least one unicode character (e.g. cat a file with these contents) > Use BashTaskRunner > Log to a file > Run a worker that picks up the task or an airflow run --local command > Behavior: > The BashOperator/cat command hangs > Most likely this is due to a pipe issue, where the unicode characters are > filling up the pipe and .format() is not able to process partial unicode > characters. Interestingly removing one of the conditions above (e.g. logging > to stdout instead of a file or having short lines doesn't cause the issue). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1780) Long Unicode Characters Cause Logging
Dan Davydov created AIRFLOW-1780: Summary: Long Unicode Characters Cause Logging Key: AIRFLOW-1780 URL: https://issues.apache.org/jira/browse/AIRFLOW-1780 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Assignee: Dan Davydov Priority: Major Conditions to replicate: Create a DAG with a single BashOperator that logs ~10 long lines of text with at least one unicode character (e.g. cat a file with these contents) Use BashTaskRunner Log to a file Run a worker that picks up the task or an airflow run --local command Behavior: The BashOperator/cat command hangs Most likely this is due to a pipe issue, where the unicode characters are filling up the pipe and .format() is not able to process partial unicode characters. Interestingly removing one of the conditions above (e.g. logging to stdout instead of a file or having short lines doesn't cause the issue). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1733) BashOperator readline can hang
Dan Davydov created AIRFLOW-1733: Summary: BashOperator readline can hang Key: AIRFLOW-1733 URL: https://issues.apache.org/jira/browse/AIRFLOW-1733 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov If the child bash process dies for some reason, the BashOperator will hang since we use a blocking readline call to get output from the child. Instead we should read asynchronously or poll the child bash process like described in the solutions here: https://stackoverflow.com/questions/18421757/live-output-from-subprocess-command -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1631] Fix timing issue in unit test
Repository: incubator-airflow Updated Branches: refs/heads/master cdfced324 -> cb868f49f [AIRFLOW-1631] Fix timing issue in unit test LocalWorker instances wait 1 sec for each unit of work they perform, so getting the response of a processor takes at least 1 sec after the unit of work. Increasing timeout in LocalTaskJobTest.test_mark_success_no_kill and decreasing the poking time on local executor checking for results in the unlimited implementation. Closes #2699 from edgarRd/erod-fix-timing-failure Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb868f49 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb868f49 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb868f49 Branch: refs/heads/master Commit: cb868f49f0b6144bd6488cbde5bdf2811001f6ac Parents: cdfced3 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Tue Oct 17 17:51:34 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Oct 17 17:51:58 2017 -0700 -- airflow/executors/local_executor.py | 2 +- tests/jobs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb868f49/airflow/executors/local_executor.py -- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 9b4f8e1..71bee22 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -160,7 +160,7 @@ class LocalExecutor(BaseExecutor): def end(self): while self.executor.workers_active > 0: self.executor.sync() -time.sleep(1) +time.sleep(0.5) class _LimitedParallelism(object): """Implements LocalExecutor with limited parallelism using a task queue to http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb868f49/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index ba08fd6..88589d8 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -860,7 +860,7 @@ class LocalTaskJobTest(unittest.TestCase): session.merge(ti) session.commit() -process.join(timeout=5) +process.join(timeout=10) self.assertFalse(process.is_alive()) ti.refresh_from_db() self.assertEqual(State.SUCCESS, ti.state)
incubator-airflow git commit: [AIRFLOW-1631] Fix local executor unbound parallelism
Repository: incubator-airflow Updated Branches: refs/heads/master 707ab6952 -> cdfced324 [AIRFLOW-1631] Fix local executor unbound parallelism Before, if unlimited parallelism was used passing `0` for the parallelism value, the local executor would stall execution since no worker was being created, violating the BaseExecutor contract on the parallelism option. Now, if unbound parallelism is used, processes will be created on demand for each task submitted for execution. Closes #2658 from edgarRd/erod-localexecutor-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cdfced32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cdfced32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cdfced32 Branch: refs/heads/master Commit: cdfced3248c7f14b639919c093f4f3042deb754b Parents: 707ab69 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Tue Oct 17 11:39:06 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Oct 17 11:39:22 2017 -0700 -- airflow/executors/local_executor.py| 205 +++- tests/executors/test_local_executor.py | 74 ++ 2 files changed, 244 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdfced32/airflow/executors/local_executor.py -- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index f9eceb3..9b4f8e1 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -11,6 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +LocalExecutor runs tasks by spawning processes in a controlled fashion in different +modes. Given that BaseExecutor has the option to receive a `parallelism` parameter to +limit the number of process spawned, when this parameter is `0` the number of processes +that LocalExecutor can spawn is unlimited. + +The following strategies are implemented: +1. Unlimited Parallelism (self.parallelism == 0): In this strategy, LocalExecutor will +spawn a process every time `execute_async` is called, that is, every task submitted to the +LocalExecutor will be executed in its own process. Once the task is executed and the +result stored in the `result_queue`, the process terminates. There is no need for a +`task_queue` in this approach, since as soon as a task is received a new process will be +allocated to the task. Processes used in this strategy are of class LocalWorker. + +2. Limited Parallelism (self.parallelism > 0): In this strategy, the LocalExecutor spawns +the number of processes equal to the value of `self.parallelism` at `start` time, +using a `task_queue` to coordinate the ingestion of tasks and the work distribution among +the workers, which will take a task as soon as they are ready. During the lifecycle of +the LocalExecutor, the worker processes are running waiting for tasks, once the +LocalExecutor receives the call to shutdown the executor a poison token is sent to the +workers to terminate them. Processes used in this strategy are of class QueuedLocalWorker. + +Arguably, `SequentialExecutor` could be thought as a LocalExecutor with limited +parallelism of just 1 worker, i.e. `self.parallelism = 1`. +This option could lead to the unification of the executor implementations, running +locally, into just one `LocalExecutor` with multiple modes. +""" import multiprocessing import subprocess @@ -18,20 +45,63 @@ import time from builtins import range -from airflow import configuration from airflow.executors.base_executor import BaseExecutor from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State -PARALLELISM = configuration.get('core', 'PARALLELISM') - class LocalWorker(multiprocessing.Process, LoggingMixin): + +"""LocalWorker Process implementation to run airflow commands. Executes the given +command and puts the result into a result queue when done, terminating execution.""" + +def __init__(self, result_queue): +""" +:param result_queue: the queue to store result states tuples (key, State) +:type result_queue: multiprocessing.Queue +""" +super(LocalWorker, self).__init__() +self.daemon = True +self.result_queue = result_queue +self.key = None +self.command = None + +def execute_work(self, key, command): +""" +Exe
[jira] [Commented] (AIRFLOW-1670) Make Airflow Viewable By Color-blind Users
[ https://issues.apache.org/jira/browse/AIRFLOW-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202413#comment-16202413 ] Dan Davydov commented on AIRFLOW-1670: -- Took a stab at it here: https://github.com/apache/incubator-airflow/pull/2654 But it wasn't met very well, we should have a special color-blind mode. > Make Airflow Viewable By Color-blind Users > -- > > Key: AIRFLOW-1670 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1670 > Project: Apache Airflow > Issue Type: Bug > Reporter: Dan Davydov > Assignee: Dan Davydov > > Currently the success and failed states are hard to differentiate for Airflow > users, RGB(204,255,204) for the success state works better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1681] Add batch clear in task instance view
Repository: incubator-airflow Updated Branches: refs/heads/master 98b4df945 -> 9f2c16a0a [AIRFLOW-1681] Add batch clear in task instance view Allow users to batch clear selected task instance(s) in task instance view. Only state(s) of selected task instance(s) will be cleared--no upstream nor downstream task instance will be affected. DAG(s) involved will be set to "RUNNING" state, same as existing "clear" operation. Keeping both "Delete" and "Clear" operations for more smooth user habit transition--informing DAG state change in pop-up (check screenshots). Closes #2681 from yrqls21/add-batch-clear-in-task- instance-view Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0 Branch: refs/heads/master Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5 Parents: 98b4df9 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Wed Oct 11 14:05:33 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Oct 11 14:05:35 2017 -0700 -- airflow/www/views.py | 62 +++ 1 file changed, 31 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index bc63b5b..81ee61f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -37,7 +37,7 @@ import sqlalchemy as sqla from sqlalchemy import or_, desc, and_, union_all from flask import ( -abort, redirect, url_for, request, Markup, Response, current_app, render_template, +abort, redirect, url_for, request, Markup, Response, current_app, render_template, make_response) from flask_admin import BaseView, expose, AdminIndexView from flask_admin.contrib.sqla import ModelView @@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly): 'start_date', 'end_date', 'duration', 'job_id', 'hostname', 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log_url') -can_delete = True page_size = PAGE_SIZE @action('set_running', "Set state to 'running'", None) @@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly): def action_set_retry(self, ids): self.set_task_instance_state(ids, State.UP_FOR_RETRY) -@action('delete', -lazy_gettext('Delete'), -lazy_gettext('Are you sure you want to delete selected records?')) -def action_delete(self, ids): -""" -As a workaround for AIRFLOW-277, this method overrides Flask-Admin's ModelView.action_delete(). - -TODO: this method should be removed once the below bug is fixed on Flask-Admin side. -https://github.com/flask-admin/flask-admin/issues/1226 -""" -if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): -self.delete_task_instances(ids) -else: -super(TaskInstanceModelView, self).action_delete(ids) - @provide_session -def set_task_instance_state(self, ids, target_state, session=None): +@action('clear', +lazy_gettext('Clear'), +lazy_gettext( +'Are you sure you want to clear the state of the selected task instance(s)' +' and set their dagruns to the running state?')) +def action_clear(self, ids, session=None): try: TI = models.TaskInstance -count = len(ids) + +dag_to_tis = {} + for id in ids: task_id, dag_id, execution_date = id.split(',') -execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') + ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date).one() -ti.state = target_state + +dag = dagbag.get_dag(dag_id) +tis = dag_to_tis.setdefault(dag, []) +tis.append(ti) + +for dag, tis in dag_to_tis.items(): +models.clear_task_instances(tis, session, dag=dag) + session.commit() -flash( -"{count} task instances were set to '{target_state}'".format(**locals())) +flash("{0} task instances have been cleared".format(len(ids))) + except Exception as ex: if no
[jira] [Resolved] (AIRFLOW-1697) Mode to disable Airflow Charts
[ https://issues.apache.org/jira/browse/AIRFLOW-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov resolved AIRFLOW-1697. -- Resolution: Fixed > Mode to disable Airflow Charts > -- > > Key: AIRFLOW-1697 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1697 > Project: Apache Airflow > Issue Type: Bug > Components: ui > Reporter: Dan Davydov > Assignee: Dan Davydov > > Mode to disable Airflow Charts -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1697] Mode to disable charts endpoint
Repository: incubator-airflow Updated Branches: refs/heads/master ebe715c56 -> 21e94c7d1 [AIRFLOW-1697] Mode to disable charts endpoint Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/21e94c7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/21e94c7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/21e94c7d Branch: refs/heads/master Commit: 21e94c7d1594c5e0806d9e1ae1205a41bf98b5d3 Parents: ebe715c Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Mon Oct 9 14:46:38 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Oct 10 11:33:50 2017 -0700 -- UPDATING.md | 2 ++ airflow/config_templates/default_airflow.cfg | 4 airflow/www/app.py | 7 +-- airflow/www/views.py | 9 - scripts/ci/airflow_travis.cfg| 1 + 5 files changed, 20 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/UPDATING.md -- diff --git a/UPDATING.md b/UPDATING.md index 6a0b8bc..ebcb5cd 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -270,6 +270,8 @@ supported and will be removed entirely in Airflow 2.0 Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without complaint. Now, invalid arguments will be rejected. (https://github.com/apache/incubator-airflow/pull/1285) +- The config value secure_mode will default to True which will disable some insecure endpoints/features + ### Known Issues There is a report that the default of "-1" for num_runs creates an issue where errors are reported while parsing tasks. It was not confirmed, but a workaround was found by changing the default back to `None`. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/airflow/config_templates/default_airflow.cfg -- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index b051583..dee6dc7 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -117,6 +117,10 @@ default_impersonation = # What security module to use (for example kerberos): security = +# If set to False enables some unsecure features like Charts. In 2.0 will +# default to True. +secure_mode = False + # Turn unit test mode on (overwrites many configuration options with test # values at runtime) unit_test_mode = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/airflow/www/app.py -- diff --git a/airflow/www/app.py b/airflow/www/app.py index bbb9410..dfdc04c 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -22,6 +22,7 @@ from flask_wtf.csrf import CSRFProtect csrf = CSRFProtect() import airflow +from airflow import configuration as conf from airflow import models, LoggingMixin from airflow.settings import Session @@ -69,8 +70,10 @@ def create_app(config=None, testing=False): av(vs.Airflow(name='DAGs', category='DAGs')) av(vs.QueryView(name='Ad Hoc Query', category="Data Profiling")) -av(vs.ChartModelView( -models.Chart, Session, name="Charts", category="Data Profiling")) + +if not conf.getboolean('core', 'secure_mode'): +av(vs.ChartModelView( +models.Chart, Session, name="Charts", category="Data Profiling")) av(vs.KnownEventView( models.KnownEvent, Session, name="Known Events", category="Data Profiling")) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index ad27238..bc63b5b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -37,7 +37,8 @@ import sqlalchemy as sqla from sqlalchemy import or_, desc, and_, union_all from flask import ( -redirect, url_for, request, Markup, Response, current_app, render_template, make_response) +abort, redirect, url_for, request, Markup, Response, current_app, render_template, +make_response) from flask_admin import BaseView, expose, AdminIndexView from flask_admin.contrib.sqla import ModelView from flask_admin.actions import action @@ -299,6 +300,9 @@ class Airflow(BaseView): def chart_data(self): from airflow import macros
[jira] [Created] (AIRFLOW-1697) Mode to disable Airflow Charts
Dan Davydov created AIRFLOW-1697: Summary: Mode to disable Airflow Charts Key: AIRFLOW-1697 URL: https://issues.apache.org/jira/browse/AIRFLOW-1697 Project: Apache Airflow Issue Type: Bug Components: ui Reporter: Dan Davydov Assignee: Dan Davydov Mode to disable Airflow Charts -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1681) Create way to batch retry task instances in the CRUD
Dan Davydov created AIRFLOW-1681: Summary: Create way to batch retry task instances in the CRUD Key: AIRFLOW-1681 URL: https://issues.apache.org/jira/browse/AIRFLOW-1681 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov The old way to batch retry tasks was to select them on the Task Instances page on the webserver and do a With Selected -> Delete. This no longer works as you will get overlapping task instance logs (e.g. the first retry log will be placed in the same location as the first try log). We need an option in the crud called With Selected -> Retry that does the same thing as With Selected -> Delete but follows the logic for task clearing (sets state to none, increases max_tries). Once this feature is stable With Selected -> Delete should probably be removed as it leaders to bad states with the logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1670) Make Airflow Viewable By Color-blind Users
Dan Davydov created AIRFLOW-1670: Summary: Make Airflow Viewable By Color-blind Users Key: AIRFLOW-1670 URL: https://issues.apache.org/jira/browse/AIRFLOW-1670 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Assignee: Dan Davydov Currently the success and failed states are hard to differentiate for Airflow users, RGB(204,255,204) for the success state works better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1651) Accessibility Mode
Dan Davydov created AIRFLOW-1651: Summary: Accessibility Mode Key: AIRFLOW-1651 URL: https://issues.apache.org/jira/browse/AIRFLOW-1651 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Colorblind users have a lot of trouble using Airflow (e.g. red-green colorblind users can't tell apart failed and successful tasks). We should have an accessibility mode to help them differentiate them (colors with more contrast and maybe different shapes). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1621] Add tests for server side paging
Repository: incubator-airflow Updated Branches: refs/heads/master 6e520704f -> 656d045e9 [AIRFLOW-1621] Add tests for server side paging Adding tests to check logic included in AIRFLOW-1519. Closes #2614 from edgarRd/erod-ui-dags-paging- tests Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/656d045e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/656d045e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/656d045e Branch: refs/heads/master Commit: 656d045e90bf67ca484a3778b2a07a419bfb324a Parents: 6e52070 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Tue Sep 19 15:52:26 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Sep 19 15:52:30 2017 -0700 -- airflow/www/utils.py| 26 tests/www/test_utils.py | 73 2 files changed, 87 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/656d045e/airflow/www/utils.py -- diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 344a4e9..96293a2 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -76,6 +76,20 @@ class DataProfilingMixin(object): ) +def get_params(**kwargs): +params = [] +for k, v in kwargs.items(): +if k == 'showPaused': +# True is default or None +if v or v is None: +continue +params.append('{}={}'.format(k, v)) +elif v: +params.append('{}={}'.format(k, v)) +params = sorted(params, key=lambda x: x.split('=')[0]) +return '&'.join(params) + + def generate_pages(current_page, num_of_pages, search=None, showPaused=None, window=7): """ @@ -103,18 +117,6 @@ def generate_pages(current_page, num_of_pages, the HTML string of the paging component """ -def get_params(**kwargs): -params = [] -for k, v in kwargs.items(): -if k == 'showPaused': -# True is default or None -if v or v is None: -continue -params.append('{}={}'.format(k, v)) -elif v: -params.append('{}={}'.format(k, v)) -return '&'.join(params) - void_link = 'javascript:void(0)' first_node = """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/656d045e/tests/www/test_utils.py -- diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py index bb0860f..d13a49d 100644 --- a/tests/www/test_utils.py +++ b/tests/www/test_utils.py @@ -13,6 +13,7 @@ # limitations under the License. import unittest +from xml.dom import minidom from airflow.www import utils @@ -31,6 +32,78 @@ class UtilsTest(unittest.TestCase): def test_sensitive_variable_should_be_hidden_ic(self): self.assertTrue(utils.should_hide_value_for_key("GOOGLE_API_KEY")) +def check_generate_pages_html(self, current_page, total_pages, + window=7, check_middle=False): +extra_links = 4 # first, prev, next, last +html_str = utils.generate_pages(current_page, total_pages) + +# dom parser has issues with special and +html_str = html_str.replace('', '') +html_str = html_str.replace('', '') +dom = minidom.parseString(html_str) +self.assertIsNotNone(dom) + +ulist = dom.getElementsByTagName('ul')[0] +ulist_items = ulist.getElementsByTagName('li') +self.assertEqual(min(window, total_pages) + extra_links, len(ulist_items)) + +def get_text(nodelist): +rc = [] +for node in nodelist: +if node.nodeType == node.TEXT_NODE: +rc.append(node.data) +return ''.join(rc) + +page_items = ulist_items[2:-2] +mid = int(len(page_items) / 2) +for i, item in enumerate(page_items): +a_node = item.getElementsByTagName('a')[0] +href_link = a_node.getAttribute('href') +node_text = get_text(a_node.childNodes) +if node_text == str(current_page + 1): +if check_middle: +self.assertEqual(mid, i) +self.assertEqual('javascript:void(0)', a_node.getAttribute('href')) +self.assertIn('active', item.getAttribute('class')) +else: +link_str = '?page=' + str(int(node_text) - 1) +self.assertEqual(link_str, href_link) + +def te
[jira] [Created] (AIRFLOW-1624) Consolidate airflow clear and airflow run
Dan Davydov created AIRFLOW-1624: Summary: Consolidate airflow clear and airflow run Key: AIRFLOW-1624 URL: https://issues.apache.org/jira/browse/AIRFLOW-1624 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Now that task clearing behaves in a non-destructive ways (creates new task instance runs), it makes sense to consolidate it with run in the UI. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1620) Support granular and customizable hook retries
Dan Davydov created AIRFLOW-1620: Summary: Support granular and customizable hook retries Key: AIRFLOW-1620 URL: https://issues.apache.org/jira/browse/AIRFLOW-1620 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov We need some kind of abstraction to allow for granular and customized retry handling for hooks. For example, some users might want to catch certain types of exceptions, and with a customized retry policy (e.g. exponential backoff), depending on their use cases. One possible solution is to use dependency injection (e.g. hooks need to be passed into operators). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1519] Add server side paging in DAGs list
Repository: incubator-airflow Updated Branches: refs/heads/master 6632b0ce1 -> b6d2e0a46 [AIRFLOW-1519] Add server side paging in DAGs list Airflow's main page previously did paging client- side via a jQuery plugin (DataTable) which was very slow at loading all DAGs. The browser would load all DAGs in the table. The result was performance degradation when having a number of DAGs in the range of 1K. This commit implements server-side paging using the webserver page size setting, sending to the browser only the elements for the specific page. Closes #2531 from edgarRd/erod-ui-dags-paging Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b6d2e0a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b6d2e0a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b6d2e0a4 Branch: refs/heads/master Commit: b6d2e0a46978e93e16576604624f57d1388814f2 Parents: 6632b0c Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Fri Sep 15 16:41:25 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Sep 15 16:41:29 2017 -0700 -- airflow/www/static/bootstrap3-typeahead.min.js | 21 airflow/www/templates/airflow/dags.html| 76 +++- airflow/www/utils.py | 121 airflow/www/views.py | 105 + licenses/LICENSE-typeahead.txt | 13 +++ 5 files changed, 308 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b6d2e0a4/airflow/www/static/bootstrap3-typeahead.min.js -- diff --git a/airflow/www/static/bootstrap3-typeahead.min.js b/airflow/www/static/bootstrap3-typeahead.min.js new file mode 100644 index 000..23aac4e --- /dev/null +++ b/airflow/www/static/bootstrap3-typeahead.min.js @@ -0,0 +1,21 @@ +/* = + * bootstrap3-typeahead.js v4.0.2 + * https://github.com/bassjobsen/Bootstrap-3-Typeahead + * = + * Original written by @mdo and @fat + * = + * Copyright 2014 Bass Jobsen @bassjobsen + * + * Licensed under the Apache License, Version 2.0 (the 'License'); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * */ +!function(a,b){"use strict";"undefined"!=typeof module&?module.exports=b(require("jquery")):"function"==typeof define&?define(["jquery"],function(a){return b(a)}):b(a.jQuery)}(this,function(a){"use strict";var b=function(c,d){this.$element=a(c),this.options=a.extend({},b.defaults,d),this.matcher=this.options.matcher||this.matcher,this.sorter=this.options.sorter||this.sorter,this.select=this.options.select||this.select,this.autoSelect="boolean"!=typeof this.options.autoSelect||this.options.autoSelect,this.highlighter=this.options.highlighter||this.highlighter,this.render=this.options.render||this.render,this.updater=this.options.updater||this.updater,this.displayText=this.options.displayText||this.displayText,this.source=this.options.source,this.delay=this.options.delay,this.$menu=a(this.options.menu),this.$appendTo=this.options.appendTo?a(this.options.appendTo):null,this.fitToElement="boolean"==typeof this.options.fitToElement&,thi s.shown=!1,this.listen(),this.showHintOnFocus=("boolean"==typeof this.options.showHintOnFocus||"all"===this.options.showHintOnFocus)&,this.afterSelect=this.options.afterSelect,this.addItem=!1,this.value=this.$element.val()||this.$element.text()};b.prototype={constructor:b,select:function(){var a=this.$menu.find(".active").data("value");if(this.$element.data("active",a),this.autoSelect||a){var b=this.updater(a);b||(b=""),this.$element.val(this.displayText(b)||b).text(this.displayText(b)||b).change(),this.afterSelect(b)}return this.hide()},updater:function(a){return a},setSource:function(a){this.source=a},show:function(){var d,b=a.extend({},this.$element.position(),{height:this.$element[0].offsetHeight}),c
[jira] [Updated] (AIRFLOW-1584) Remove the insecure /headers endpoints
[ https://issues.apache.org/jira/browse/AIRFLOW-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1584: - Description: Impact: An XSS vulnerability on Airflow would be able to read a user's auth_proxy cookie, granting the attacker access to any other InternalAuth-gated application on Airbnb's network. Target: The endpoint at https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or https://airflow-precious.d.musta.ch/admin/airflow/headers Description: The endpoint listed in the Target section returns the headers sent by the user's browser, including the Cookie header. Since this endpoint can be called by JavaScript on the Airflow domain, this allows JS running on Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially other) cookie sets. This means that malicious JavaScript can steal the auth_proxy cookie and use it to authenticate to other InternalAuth services. {code:java} This can be demonstrated by running the following JavaScript snippet in any Airflow tab: $.get("/admin/airflow/headers", function(data) {alert(data['headers']['Cookie']);}) {code} Remediation: Disable this endpoint entirely. If some of the headers are important they can be added to the gunicorn request log format. was: Impact: An XSS vulnerability on Airflow would be able to read a user's auth_proxy cookie, granting the attacker access to any other InternalAuth-gated application on Airbnb's network. Target: The endpoint at https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or https://airflow-precious.d.musta.ch/admin/airflow/headers Description: The endpoint listed in the Target section returns the headers sent by the user's browser, including the Cookie header. Since this endpoint can be called by JavaScript on the Airflow domain, this allows JS running on Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially other) cookie sets. This means that malicious JavaScript can steal the auth_proxy cookie and use it to authenticate to other InternalAuth services. This can be demonstrated by running the following JavaScript snippet in any Airflow tab: $.get("/admin/airflow/headers", function(data) {alert(data['headers']['Cookie']);}) Remediation: Disable this endpoint entirely. If some of the headers are important they can be added to the gunicorn request log format. > Remove the insecure /headers endpoints > -- > > Key: AIRFLOW-1584 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1584 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Dan Davydov >Assignee: Dan Davydov > > Impact: An XSS vulnerability on Airflow would be able to read a user's > auth_proxy cookie, granting the attacker access to any other > InternalAuth-gated application on Airbnb's network. > Target: The endpoint at > https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or > https://airflow-precious.d.musta.ch/admin/airflow/headers > Description: The endpoint listed in the Target section returns the headers > sent by the user's browser, including the Cookie header. Since this endpoint > can be called by JavaScript on the Airflow domain, this allows JS running on > Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially > other) cookie sets. This means that malicious JavaScript can steal the > auth_proxy cookie and use it to authenticate to other InternalAuth services. > {code:java} > This can be demonstrated by running the following JavaScript snippet in any > Airflow tab: > $.get("/admin/airflow/headers", function(data) > {alert(data['headers']['Cookie']);}) > {code} > Remediation: Disable this endpoint entirely. If some of the headers are > important they can be added to the gunicorn request log format. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (AIRFLOW-1584) Remove the insecure /headers endpoints
[ https://issues.apache.org/jira/browse/AIRFLOW-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1584: - Description: Impact: An XSS vulnerability on Airflow would be able to read a user's auth_proxy cookie, granting the attacker access to any other InternalAuth-gated application on Airbnb's network. Target: The endpoint at https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or https://airflow-precious.d.musta.ch/admin/airflow/headers Description: The endpoint listed in the Target section returns the headers sent by the user's browser, including the Cookie header. Since this endpoint can be called by JavaScript on the Airflow domain, this allows JS running on Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially other) cookie sets. This means that malicious JavaScript can steal the auth_proxy cookie and use it to authenticate to other InternalAuth services. This can be demonstrated by running the following JavaScript snippet in any Airflow tab: {code:java} $.get("/admin/airflow/headers", function(data) {alert(data['headers']['Cookie']);}) {code} Remediation: Disable this endpoint entirely. If some of the headers are important they can be added to the gunicorn request log format. was: Impact: An XSS vulnerability on Airflow would be able to read a user's auth_proxy cookie, granting the attacker access to any other InternalAuth-gated application on Airbnb's network. Target: The endpoint at https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or https://airflow-precious.d.musta.ch/admin/airflow/headers Description: The endpoint listed in the Target section returns the headers sent by the user's browser, including the Cookie header. Since this endpoint can be called by JavaScript on the Airflow domain, this allows JS running on Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially other) cookie sets. This means that malicious JavaScript can steal the auth_proxy cookie and use it to authenticate to other InternalAuth services. {code:java} This can be demonstrated by running the following JavaScript snippet in any Airflow tab: $.get("/admin/airflow/headers", function(data) {alert(data['headers']['Cookie']);}) {code} Remediation: Disable this endpoint entirely. If some of the headers are important they can be added to the gunicorn request log format. > Remove the insecure /headers endpoints > -- > > Key: AIRFLOW-1584 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1584 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Dan Davydov >Assignee: Dan Davydov > > Impact: An XSS vulnerability on Airflow would be able to read a user's > auth_proxy cookie, granting the attacker access to any other > InternalAuth-gated application on Airbnb's network. > Target: The endpoint at > https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or > https://airflow-precious.d.musta.ch/admin/airflow/headers > Description: The endpoint listed in the Target section returns the headers > sent by the user's browser, including the Cookie header. Since this endpoint > can be called by JavaScript on the Airflow domain, this allows JS running on > Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially > other) cookie sets. This means that malicious JavaScript can steal the > auth_proxy cookie and use it to authenticate to other InternalAuth services. > This can be demonstrated by running the following JavaScript snippet in any > Airflow tab: > {code:java} > $.get("/admin/airflow/headers", function(data) > {alert(data['headers']['Cookie']);}) > {code} > Remediation: Disable this endpoint entirely. If some of the headers are > important they can be added to the gunicorn request log format. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1584) Remove the insecure /headers endpoints
Dan Davydov created AIRFLOW-1584: Summary: Remove the insecure /headers endpoints Key: AIRFLOW-1584 URL: https://issues.apache.org/jira/browse/AIRFLOW-1584 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov Assignee: Dan Davydov Impact: An XSS vulnerability on Airflow would be able to read a user's auth_proxy cookie, granting the attacker access to any other InternalAuth-gated application on Airbnb's network. Target: The endpoint at https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or https://airflow-precious.d.musta.ch/admin/airflow/headers Description: The endpoint listed in the Target section returns the headers sent by the user's browser, including the Cookie header. Since this endpoint can be called by JavaScript on the Airflow domain, this allows JS running on Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially other) cookie sets. This means that malicious JavaScript can steal the auth_proxy cookie and use it to authenticate to other InternalAuth services. This can be demonstrated by running the following JavaScript snippet in any Airflow tab: $.get("/admin/airflow/headers", function(data) {alert(data['headers']['Cookie']);}) Remediation: Disable this endpoint entirely. If some of the headers are important they can be added to the gunicorn request log format. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1574] add 'to' attribute to templated vars of email operator
Repository: incubator-airflow Updated Branches: refs/heads/master 8f1ec4dee -> 2d4069448 [AIRFLOW-1574] add 'to' attribute to templated vars of email operator The to field may sometimes want to be to be template-able when you have a DAG that is using XCOM to find the user to send the information to (i.e. we have a form that a user submits and based on the ldap user we send this specific user the information). It's a rather easy fix to add the 'to' user to the template-able options. Closes #2577 from Acehaidrey/AIRFLOW-1574 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d406944 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d406944 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d406944 Branch: refs/heads/master Commit: 2d40694482df7c9c1c02a61e11d51bedb9034a93 Parents: 8f1ec4d Author: Ace Haidrey <ahaid...@pandora.com> Authored: Thu Sep 7 11:53:16 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Sep 7 11:53:20 2017 -0700 -- airflow/operators/email_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d406944/airflow/operators/email_operator.py -- diff --git a/airflow/operators/email_operator.py b/airflow/operators/email_operator.py index 5167a7a..6eaae95 100644 --- a/airflow/operators/email_operator.py +++ b/airflow/operators/email_operator.py @@ -36,7 +36,7 @@ class EmailOperator(BaseOperator): :type bcc: list or string (comma or semicolon delimited) """ -template_fields = ('subject', 'html_content') +template_fields = ('to', 'subject', 'html_content') template_ext = ('.html',) ui_color = '#e6faf9'
incubator-airflow git commit: [AIRFLOW-1541] Add channel to template fields of slack_operator
Repository: incubator-airflow Updated Branches: refs/heads/master b1f902e63 -> 9450d8db6 [AIRFLOW-1541] Add channel to template fields of slack_operator Closes #2549 from Acehaidrey/AIRFLOW-1541 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9450d8db Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9450d8db Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9450d8db Branch: refs/heads/master Commit: 9450d8db6f005d3c5ad84c23c484bbb119e112a8 Parents: b1f902e Author: Ace Haidrey <ahaid...@pandora.com> Authored: Wed Aug 30 11:52:47 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Aug 30 11:52:48 2017 -0700 -- airflow/operators/slack_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9450d8db/airflow/operators/slack_operator.py -- diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py index 2e6d426..86659d9 100644 --- a/airflow/operators/slack_operator.py +++ b/airflow/operators/slack_operator.py @@ -86,7 +86,7 @@ class SlackAPIPostOperator(SlackAPIOperator): :type attachments: array of hashes """ -template_fields = ('username', 'text', 'attachments') +template_fields = ('username', 'text', 'attachments', 'channel') ui_color = '#FFBA40' @apply_defaults
[jira] [Commented] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128134#comment-16128134 ] Dan Davydov commented on AIRFLOW-1311: -- Edgar assigning to you, feel free to mark as dupe or merge if you have an existing JIRA ticket. > Improve Webserver Load Time For Large DAGs > -- > > Key: AIRFLOW-1311 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1311 > Project: Apache Airflow > Issue Type: Bug > Components: webserver > Reporter: Dan Davydov >Assignee: Edgar Rodriguez > > Large DAGs can take an extremely long time to load in the Airflow UI > (minutes/timeout). > The fixes are as follows: > 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing > tasks by their depth, and allow users to expand sections for these DAGs, > ideally prefetch deeper tasks once the initial set of tasks has rendered ) > 2. Identify bottlenecks/performance issues in both the frontend/backend for > rendering DAGs on the webserver and fix them. Airflow should be more > performant for displaying DAGs that are somewhat large, e.g. DAGs that have > up to 500 nodes and 2000 edges (dependencies from one task to another) should > render within a couple of seconds. > 3. Make DAG loading asynchronous in the UI (once the top-level tasks have > loaded display them immediately). We might not want to do this as users might > try to click something only to have the UI change from underneath them > [~saguziel] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1266) Long task names are truncated in gannt view
[ https://issues.apache.org/jira/browse/AIRFLOW-1266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov resolved AIRFLOW-1266. -- Resolution: Fixed > Long task names are truncated in gannt view > --- > > Key: AIRFLOW-1266 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1266 > Project: Apache Airflow > Issue Type: Bug > Components: webserver > Reporter: Dan Davydov > Fix For: 1.8.2 > > > Long task names are truncated in gannt view, we should make the text smaller > instead. To reproduce create a task called > abczzzdef and note that the start/end characters > are not fully visible in the gant view. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov reassigned AIRFLOW-1311: Assignee: Edgar Rodriguez (was: Dan Davydov) > Improve Webserver Load Time For Large DAGs > -- > > Key: AIRFLOW-1311 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1311 > Project: Apache Airflow > Issue Type: Bug > Components: webserver > Reporter: Dan Davydov >Assignee: Edgar Rodriguez > > Large DAGs can take an extremely long time to load in the Airflow UI > (minutes/timeout). > The fixes are as follows: > 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing > tasks by their depth, and allow users to expand sections for these DAGs, > ideally prefetch deeper tasks once the initial set of tasks has rendered ) > 2. Identify bottlenecks/performance issues in both the frontend/backend for > rendering DAGs on the webserver and fix them. Airflow should be more > performant for displaying DAGs that are somewhat large, e.g. DAGs that have > up to 500 nodes and 2000 edges (dependencies from one task to another) should > render within a couple of seconds. > 3. Make DAG loading asynchronous in the UI (once the top-level tasks have > loaded display them immediately). We might not want to do this as users might > try to click something only to have the UI change from underneath them > [~saguziel] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov reassigned AIRFLOW-1311: Assignee: Dan Davydov > Improve Webserver Load Time For Large DAGs > -- > > Key: AIRFLOW-1311 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1311 > Project: Apache Airflow > Issue Type: Bug > Components: webserver > Reporter: Dan Davydov > Assignee: Dan Davydov > > Large DAGs can take an extremely long time to load in the Airflow UI > (minutes/timeout). > The fixes are as follows: > 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing > tasks by their depth, and allow users to expand sections for these DAGs, > ideally prefetch deeper tasks once the initial set of tasks has rendered ) > 2. Identify bottlenecks/performance issues in both the frontend/backend for > rendering DAGs on the webserver and fix them. Airflow should be more > performant for displaying DAGs that are somewhat large, e.g. DAGs that have > up to 500 nodes and 2000 edges (dependencies from one task to another) should > render within a couple of seconds. > 3. Make DAG loading asynchronous in the UI (once the top-level tasks have > loaded display them immediately). We might not want to do this as users might > try to click something only to have the UI change from underneath them > [~saguziel] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1442) Airflow Ignore All Deps Command Generation Has an Extra Space
[ https://issues.apache.org/jira/browse/AIRFLOW-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov resolved AIRFLOW-1442. -- Resolution: Fixed > Airflow Ignore All Deps Command Generation Has an Extra Space > - > > Key: AIRFLOW-1442 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1442 > Project: Apache Airflow > Issue Type: Bug > Reporter: Dan Davydov > Assignee: Dan Davydov > > Airflow Ignore All Deps Command Generation Has an Extra Space. This causes > airflow commands e.g. generated by the webserver to fail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1495] Fix migration on index on job_id
Repository: incubator-airflow Updated Branches: refs/heads/master 04bfba3aa -> 67b47c958 [AIRFLOW-1495] Fix migration on index on job_id There was a merge conflict on the migration hash for down revision at the time that two commits including migrations were merged. This commit restores the chain of revisions for the migrations, pointing to the last one. The job_id index migration was regenerated from the top migration. Closes #2524 from edgarRd/erod-ti-jobid-index-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67b47c95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67b47c95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67b47c95 Branch: refs/heads/master Commit: 67b47c958903a2297916b44e97adc289d6184b5a Parents: 04bfba3 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Tue Aug 15 15:27:04 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Aug 15 15:27:06 2017 -0700 -- .../7171349d4c73_add_ti_job_id_index.py | 38 .../947454bf1dff_add_ti_job_id_index.py | 38 2 files changed, 38 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b47c95/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py -- diff --git a/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py b/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py deleted file mode 100644 index b7e2be6..000 --- a/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""add ti job_id index - -Revision ID: 7171349d4c73 -Revises: cc1e65623dc7 -Create Date: 2017-08-14 18:08:50.196042 - -""" - -# revision identifiers, used by Alembic. -revision = '7171349d4c73' -down_revision = 'cc1e65623dc7' -branch_labels = None -depends_on = None - -from alembic import op -import sqlalchemy as sa - - -def upgrade(): -op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False) - - -def downgrade(): -op.drop_index('ti_job_id', table_name='task_instance') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b47c95/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py -- diff --git a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py new file mode 100644 index 000..b0817c3 --- /dev/null +++ b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add ti job_id index + +Revision ID: 947454bf1dff +Revises: bdaa763e6c56 +Create Date: 2017-08-15 15:12:13.845074 + +""" + +# revision identifiers, used by Alembic. +revision = '947454bf1dff' +down_revision = 'bdaa763e6c56' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): +op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False) + + +def downgrade(): +op.drop_index('ti_job_id', table_name='task_instance')
incubator-airflow git commit: [AIRFLOW-1483] Making page size consistent in list
Repository: incubator-airflow Updated Branches: refs/heads/master e1772c008 -> 04bfba3aa [AIRFLOW-1483] Making page size consistent in list Views showing model listings had large page sizes which made page loading really slow client-side, mostly due to DOM processing and JS plugin rendering. Also, the page size was inconsistent across some listings. This commit introduces a configurable page size, and by default it'll use a page_size = 100. Also, the same page size is applied to all the model views controlled by flask_admin to be consistent. Closes #2497 from edgarRd/erod-ui-page-size-conf Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/04bfba3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/04bfba3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/04bfba3a Branch: refs/heads/master Commit: 04bfba3aa97deab850c14763279d33a6dfceb205 Parents: e1772c0 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Tue Aug 15 15:01:17 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Aug 15 15:01:19 2017 -0700 -- airflow/config_templates/default_airflow.cfg | 3 +++ airflow/config_templates/default_test.cfg| 1 + airflow/www/views.py | 9 + 3 files changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/04bfba3a/airflow/config_templates/default_airflow.cfg -- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index b568d3a..948c72c 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -233,6 +233,9 @@ log_fetch_timeout_sec = 5 # DAGs by default hide_paused_dags_by_default = False +# Consistent page size across all listing views in the UI +page_size = 100 + [email] email_backend = airflow.utils.email.send_email_smtp http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/04bfba3a/airflow/config_templates/default_test.cfg -- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 4452ffa..2a090d4 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -57,6 +57,7 @@ dag_orientation = LR dag_default_view = tree log_fetch_timeout_sec = 5 hide_paused_dags_by_default = False +page_size = 100 [email] email_backend = airflow.utils.email.send_email_smtp http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/04bfba3a/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index f813a0b..80b9dd3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -91,6 +91,8 @@ logout_user = airflow.login.logout_user FILTER_BY_OWNER = False +PAGE_SIZE = conf.getint('webserver', 'page_size') + if conf.getboolean('webserver', 'FILTER_BY_OWNER'): # filter_by_owner if authentication is enabled and filter_by_owner is true FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED'] @@ -1966,7 +1968,7 @@ class AirflowModelView(ModelView): edit_template = 'airflow/model_edit.html' create_template = 'airflow/model_create.html' column_display_actions = True -page_size = 500 +page_size = PAGE_SIZE class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView): @@ -2283,7 +2285,6 @@ class VariableView(wwwutils.DataProfilingMixin, AirflowModelView): class XComView(wwwutils.SuperUserMixin, AirflowModelView): verbose_name = "XCom" verbose_name_plural = "XComs" -page_size = 20 form_columns = ( 'key', @@ -2438,7 +2439,7 @@ class TaskInstanceModelView(ModelViewOnly): 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log_url') can_delete = True -page_size = 500 +page_size = PAGE_SIZE @action('set_running', "Set state to 'running'", None) def action_set_running(self, ids): @@ -2704,7 +2705,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView): ) can_delete = False can_create = False -page_size = 50 +page_size = PAGE_SIZE list_template = 'airflow/list_dags.html' named_filter_urls = True
incubator-airflow git commit: [AIRFLOW-1495] Add TaskInstance index on job_id
Repository: incubator-airflow Updated Branches: refs/heads/master 4cf904cf5 -> e1772c008 [AIRFLOW-1495] Add TaskInstance index on job_id Column job_id is unindexed in TaskInstance, it was used as default sort column in TaskInstanceView. This commit adds the required migration to add the index on task_instance.job_id on future db upgrades. Closes #2520 from edgarRd/erod-ti-jobid-index Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e1772c00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e1772c00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e1772c00 Branch: refs/heads/master Commit: e1772c008d607a2545ddaa05508b1a74473be0ec Parents: 4cf904c Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Tue Aug 15 14:57:26 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Aug 15 14:57:28 2017 -0700 -- .../7171349d4c73_add_ti_job_id_index.py | 38 1 file changed, 38 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1772c00/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py -- diff --git a/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py b/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py new file mode 100644 index 000..b7e2be6 --- /dev/null +++ b/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add ti job_id index + +Revision ID: 7171349d4c73 +Revises: cc1e65623dc7 +Create Date: 2017-08-14 18:08:50.196042 + +""" + +# revision identifiers, used by Alembic. +revision = '7171349d4c73' +down_revision = 'cc1e65623dc7' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): +op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False) + + +def downgrade(): +op.drop_index('ti_job_id', table_name='task_instance')
[jira] [Created] (AIRFLOW-1513) Come up with an abstraction to set defaults per task
Dan Davydov created AIRFLOW-1513: Summary: Come up with an abstraction to set defaults per task Key: AIRFLOW-1513 URL: https://issues.apache.org/jira/browse/AIRFLOW-1513 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Come up with an abstraction to set defaults per task, e.g. all HiveOperator tasks will have some specific retry policy that can be set globally by Airflow users. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-855] Replace PickleType with LargeBinary in XCom
Repository: incubator-airflow Updated Branches: refs/heads/master 984a87c0c -> 4cf904cf5 [AIRFLOW-855] Replace PickleType with LargeBinary in XCom PickleType in Xcom allows remote code execution. In order to deprecate it without changing mysql table schema, change PickleType to LargeBinary because they both maps to blob type in mysql. Add "enable_pickling" to function signature to control using ether pickle type or JSON. "enable_pickling" should also be added to core section of airflow.cfg Picked up where https://github.com/apache /incubator-airflow/pull/2132 left off. Took this PR, fixed merge conflicts, added documentation/tests, fixed broken tests/operators, and fixed the python3 issues. Closes #2518 from aoen/disable-pickle-type Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cf904cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cf904cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cf904cf Branch: refs/heads/master Commit: 4cf904cf5a7a070bbeaf3a0e985ed2b840276015 Parents: 984a87c Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Tue Aug 15 12:24:02 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Aug 15 12:24:07 2017 -0700 -- UPDATING.md | 6 + airflow/config_templates/default_airflow.cfg| 4 + airflow/config_templates/default_test.cfg | 1 + airflow/contrib/operators/ssh_operator.py | 10 +- ...c56_make_xcom_value_column_a_large_binary.py | 45 airflow/models.py | 110 ++- tests/contrib/operators/test_sftp_operator.py | 84 +- tests/contrib/operators/test_ssh_operator.py| 24 +++- tests/models.py | 87 +++ 9 files changed, 338 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/UPDATING.md -- diff --git a/UPDATING.md b/UPDATING.md index 3a880ab..92ee4b4 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -32,6 +32,12 @@ supported and will be removed entirely in Airflow 2.0 - `contrib.hooks.gcp_dataflow_hook.DataFlowHook` starts to use `--runner=DataflowRunner` instead of `DataflowPipelineRunner`, which is removed from the package `google-cloud-dataflow-0.6.0`. +- The pickle type for XCom messages has been replaced by json to prevent RCE attacks. + Note that JSON serialization is stricter than pickling, so if you want to e.g. pass + raw bytes through XCom you must encode them using an encoding like base64. + By default pickling is still enabled until Airflow 2.0. To disable it + Set enable_xcom_pickling = False in your Airflow config. + ## Airflow 1.8.1 The Airflow package name was changed from `airflow` to `apache-airflow` during this release. You must uninstall your http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/config_templates/default_airflow.cfg -- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index dcb99ed..b568d3a 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -129,6 +129,10 @@ logging_config_path = # Default to use file task handler. task_log_reader = file.task +# Whether to enable pickling for xcom (note that this is insecure and allows for +# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False). +enable_xcom_pickling = True + [cli] # In what way should the cli access the API. The LocalClient will use the # database directly, while the json_client will use the api running on the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/config_templates/default_test.cfg -- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 88b19a5..4452ffa 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -37,6 +37,7 @@ dag_concurrency = 16 dags_are_paused_at_creation = False fernet_key = {FERNET_KEY} non_pooled_task_slot_count = 128 +enable_xcom_pickling = False [cli] api_client = airflow.api.client.local_client http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/contrib/operators/ssh_operator.py -- diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operat
incubator-airflow git commit: [AIRFLOW-1239] Fix unicode error for logs in base_task_runner
Repository: incubator-airflow Updated Branches: refs/heads/master 565423a39 -> 42cad6069 [AIRFLOW-1239] Fix unicode error for logs in base_task_runner The details here are that there exists a PR for this JIRA already (https://github.com/apache/incubator- airflow/pull/2318). The issue is that in python 2.7 not all literals are automatically unicode like they are in python 3. That's what's the root cause, and that can simply be fixed by just explicitly stating all literals should be treated as unicode, which is an import from the `__future__` module. https://stackoverflow.com/questions/3235386/python-using-format-on- a-unicode-escaped-string also explains this same solution, which I found helpful. Closes #2496 from Acehaidrey/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/42cad606 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/42cad606 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/42cad606 Branch: refs/heads/master Commit: 42cad60698646f617537a7f4ba713fd6b3fc2ecb Parents: 565423a Author: Ace Haidrey <ahaid...@pandora.com> Authored: Mon Aug 14 15:13:01 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Aug 14 15:13:02 2017 -0700 -- airflow/task_runner/base_task_runner.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/42cad606/airflow/task_runner/base_task_runner.py -- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index 7229be5..bed8eaa 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import unicode_literals import getpass import os
incubator-airflow git commit: [AIRFLOW-1452] workaround lock on method
Repository: incubator-airflow Updated Branches: refs/heads/master b0669b532 -> 0d0cc62f4 [AIRFLOW-1452] workaround lock on method Workaround lock on method "has_table" in case mssql is used as storage engine. Closes #2514 from patsak/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0d0cc62f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0d0cc62f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0d0cc62f Branch: refs/heads/master Commit: 0d0cc62f49525166bc877606affa5a623ba52c4d Parents: b0669b5 Author: k.privezentsev <konstantin.privezent...@kaspersky.com> Authored: Fri Aug 11 11:47:35 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Aug 11 11:47:42 2017 -0700 -- .../cc1e65623dc7_add_max_tries_column_to_task_instance.py | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0d0cc62f/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py -- diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py index 2d5ffc2..b151e0c 100644 --- a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py +++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py @@ -29,6 +29,7 @@ from alembic import op import sqlalchemy as sa from airflow import settings from airflow.models import DagBag, TaskInstance +from sqlalchemy.engine.reflection import Inspector BATCH_SIZE = 5000 @@ -39,10 +40,12 @@ def upgrade(): # needed for database that does not create table until migration finishes. # Checking task_instance table exists prevent the error of querying # non-existing task_instance table. -engine = settings.engine -if engine.dialect.has_table(engine, 'task_instance'): +connection = op.get_bind() +inspector = Inspector.from_engine(connection) +tables = inspector.get_table_names() + +if 'task_instance' in tables: # Get current session -connection = op.get_bind() sessionmaker = sa.orm.sessionmaker() session = sessionmaker(bind=connection) dagbag = DagBag(settings.DAGS_FOLDER)
incubator-airflow git commit: [AIRFLOW-1385] Make Airflow task logging configurable
Repository: incubator-airflow Updated Branches: refs/heads/master 0bc248fc7 -> b0669b532 [AIRFLOW-1385] Make Airflow task logging configurable This PR adds configurable task logging to Airflow. Please refer to #2422 for previous discussions. This is the first step of making entire Airflow logging configurable ([AIRFLOW-1454](https://issue s.apache.org/jira/browse/AIRFLOW-1454)). Closes #2464 from AllisonWang/allison--log- abstraction Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0669b53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0669b53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0669b53 Branch: refs/heads/master Commit: b0669b532a7be9aa34a4390951deaa25897c62e6 Parents: 0bc248f Author: AllisonWang <allisonwang...@gmail.com> Authored: Fri Aug 11 11:38:37 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Aug 11 11:38:39 2017 -0700 -- airflow/bin/cli.py | 106 ++--- airflow/config_templates/__init__.py| 13 ++ airflow/config_templates/default_airflow.cfg| 7 + .../config_templates/default_airflow_logging.py | 94 airflow/dag/__init__.py | 1 - airflow/settings.py | 14 ++ airflow/task_runner/base_task_runner.py | 2 + airflow/utils/log/__init__.py | 13 ++ airflow/utils/log/file_task_handler.py | 176 +++ airflow/utils/log/gcs_task_handler.py | 95 airflow/utils/log/s3_task_handler.py| 91 airflow/utils/logging.py| 47 +--- airflow/www/views.py| 225 +++ tests/utils/test_log_handlers.py| 73 ++ tests/utils/test_logging.py | 17 +- 15 files changed, 687 insertions(+), 287 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 077cb90..e9e60cb 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -22,7 +22,6 @@ import os import socket import subprocess import textwrap -import warnings from importlib import import_module import argparse @@ -54,8 +53,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance, from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils -from airflow.utils import logging as logging_utils -from airflow.utils.file import mkdirs from airflow.www.app import cached_app from sqlalchemy import func @@ -357,61 +354,22 @@ def run(args, dag=None): ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() -logging.root.handlers = [] +logger = logging.getLogger('airflow.task') if args.raw: -# Output to STDOUT for the parent process to read and log -logging.basicConfig( -stream=sys.stdout, -level=settings.LOGGING_LEVEL, -format=settings.LOG_FORMAT) -else: -# Setting up logging to a file. - -# To handle log writing when tasks are impersonated, the log files need to -# be writable by the user that runs the Airflow command and the user -# that is impersonated. This is mainly to handle corner cases with the -# SubDagOperator. When the SubDagOperator is run, all of the operators -# run under the impersonated user and create appropriate log files -# as the impersonated user. However, if the user manually runs tasks -# of the SubDagOperator through the UI, then the log files are created -# by the user that runs the Airflow command. For example, the Airflow -# run command may be run by the `airflow_sudoable` user, but the Airflow -# tasks may be run by the `airflow` user. If the log files are not -# writable by both users, then it's possible that re-running a task -# via the UI (or vice versa) results in a permission error as the task -# tries to write to a log file created by the other user. -try_number = ti.try_number -log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) -log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id, - args.execution_date) -directory = os.path.join(log_base, log_relative_dir) -# Create the log file and give it group writable permissions -# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag -
[jira] [Resolved] (AIRFLOW-1349) Max active dagrun check for backfills shouldn't include the backfilled dagrun
[ https://issues.apache.org/jira/browse/AIRFLOW-1349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov resolved AIRFLOW-1349. -- Resolution: Fixed > Max active dagrun check for backfills shouldn't include the backfilled dagrun > - > > Key: AIRFLOW-1349 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1349 > Project: Apache Airflow > Issue Type: Bug > Components: backfill > Reporter: Dan Davydov >Assignee: Edgar Rodriguez > > When you backfill a dag with e.g. 1 max active dagrun, if that dagrun is > already running then it shouldn't count against the max active dagruns of the > backfill and make the backfill fail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1443] Update Airflow configuration documentation
Repository: incubator-airflow Updated Branches: refs/heads/master d9109d645 -> 6825d97b8 [AIRFLOW-1443] Update Airflow configuration documentation This PR updates Airflow configuration documentations to include a recent change to split task logs by try number #2383. Closes #2467 from AllisonWang/allison--update-doc Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6825d97b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6825d97b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6825d97b Branch: refs/heads/master Commit: 6825d97b82a3b235685ea8265380a20eea90c990 Parents: d9109d6 Author: AllisonWang <allisonwang...@gmail.com> Authored: Wed Aug 9 14:49:54 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Aug 9 14:49:56 2017 -0700 -- UPDATING.md| 29 - docs/configuration.rst | 15 --- 2 files changed, 24 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6825d97b/UPDATING.md -- diff --git a/UPDATING.md b/UPDATING.md index a02ff04..3a880ab 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -9,8 +9,11 @@ assists people when migrating to a new version. SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible. - update SSHHook constructor - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info. - - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info. - - No updates are required if you are using ftpHook, it will continue work as is. + - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info. + - No updates are required if you are using ftpHook, it will continue work as is. + +### Logging update + Logs now are stored in the log folder as ``{dag_id}/{task_id}/{execution_date}/{try_number}.log``. ### New Features @@ -61,8 +64,8 @@ interfere. Please read through these options, defaults have changed since 1.7.1. child_process_log_directory -In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each -DAG has its own log file for the scheduler. These are placed in `child_process_log_directory` which defaults to +In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each +DAG has its own log file for the scheduler. These are placed in `child_process_log_directory` which defaults to `/scheduler/latest`. You will need to make sure these log files are removed. > DAG logs or processor logs ignore and command line settings for log file > locations. @@ -72,7 +75,7 @@ Previously the command line option `num_runs` was used to let the scheduler term loops. This is now time bound and defaults to `-1`, which means run continuously. See also num_runs. num_runs -Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies +Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies the number of times to try to schedule each DAG file within `run_duration` time. Defaults to `-1`, which means try indefinitely. This is only available on the command line. @@ -85,7 +88,7 @@ dags are not being picked up, have a look at this number and decrease it when ne catchup_by_default By default the scheduler will fill any missing interval DAG Runs between the last execution date and the current date. -This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as +This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as `catchup = False / True`. Command line backfills will still work. ### Faulty Dags do not show an error in the Web UI @@ -109,33 +112,33 @@ convenience variables to the config. In case your run a sceure Hadoop setup it m required to whitelist these variables by adding the following to your configuration: ``` - + hive.security.authorization.sqlstd.confwhitelist.append airflow\.ctx\..* ``` ### Google Cloud Operator and Hook alignment -All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection +All Google Cloud Operators and Hooks are aligned and use the s
[jira] [Closed] (AIRFLOW-672) Allow logs to be piped into another process
[ https://issues.apache.org/jira/browse/AIRFLOW-672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov closed AIRFLOW-672. --- Resolution: Fixed Not really relevant with the new logging abstraction anymore. We can create a piping to a file logging handler if we want now. > Allow logs to be piped into another process > --- > > Key: AIRFLOW-672 > URL: https://issues.apache.org/jira/browse/AIRFLOW-672 > Project: Apache Airflow > Issue Type: Improvement > Components: logging > Reporter: Dan Davydov > > Instead of writing logs to a file Airflow should be able to pipe them to a > process similar to how Apache Webserver does it: > https://httpd.apache.org/docs/1.3/logs.html#piped . The most important reason > for this is to allow log rotation of tasks which can get quite large on the > local worker disks. Part of this task could be moving the existing s3 log > exporting logic (and other custom logging logic) out of the airflow core. > FWIW I have the code ready to add log rotation to Airflow itself, but this > isn't as ideal as the piping solution (separation of concerns). If there is > interest in getting this merged as a temporary measure I can do so. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1486] Unexpected S3 writing log error
Repository: incubator-airflow Updated Branches: refs/heads/master f5c845739 -> d9109d645 [AIRFLOW-1486] Unexpected S3 writing log error Removed unexpected S3 writing log error and added tests for s3 logging. Closes #2499 from skudriashev/airflow-1486 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9109d64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9109d64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9109d64 Branch: refs/heads/master Commit: d9109d6458d136cd2b76ef7180be498ae09b3ea3 Parents: f5c8457 Author: Stanislav Kudriashev <stas.kudrias...@gmail.com> Authored: Mon Aug 7 15:52:51 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Aug 7 15:52:54 2017 -0700 -- airflow/utils/logging.py| 40 +++ tests/utils/test_logging.py | 107 +++ 2 files changed, 117 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/airflow/utils/logging.py -- diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index b86d839..6e18e52 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -91,10 +91,13 @@ class S3Log(object): except: pass -# raise/return error if we get here -err = 'Could not read logs from {}'.format(remote_log_location) -logging.error(err) -return err if return_error else '' +# return error if needed +if return_error: +msg = 'Could not read logs from {}'.format(remote_log_location) +logging.error(msg) +return msg + +return '' def write(self, log, remote_log_location, append=True): """ @@ -108,25 +111,21 @@ class S3Log(object): :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. :type append: bool - """ if self.hook: - if append: old_log = self.read(remote_log_location) -log = old_log + '\n' + log +log = '\n'.join([old_log, log]) + try: self.hook.load_string( log, key=remote_log_location, replace=True, -encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS')) -return +encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'), +) except: -pass - -# raise/return error if we get here -logging.error('Could not write logs to {}'.format(remote_log_location)) +logging.error('Could not write logs to {}'.format(remote_log_location)) class GCSLog(object): @@ -183,10 +182,13 @@ class GCSLog(object): except: pass -# raise/return error if we get here -err = 'Could not read logs from {}'.format(remote_log_location) -logging.error(err) -return err if return_error else '' +# return error if needed +if return_error: +msg = 'Could not read logs from {}'.format(remote_log_location) +logging.error(msg) +return msg + +return '' def write(self, log, remote_log_location, append=True): """ @@ -200,12 +202,11 @@ class GCSLog(object): :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. :type append: bool - """ if self.hook: if append: old_log = self.read(remote_log_location) -log = old_log + '\n' + log +log = '\n'.join([old_log, log]) try: bkt, blob = self.parse_gcs_url(remote_log_location) @@ -218,7 +219,6 @@ class GCSLog(object): tmpfile.flush() self.hook.upload(bkt, blob, tmpfile.name) except: -# raise/return error if we get here logging.error('Could not write logs to {}'.format(remote_log_location)) def parse_gcs_url(self, gsurl): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/tests/utils/test_logging.py -- diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index 474430f..3f9f5d6 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.p
incubator-airflow git commit: [AIRFLOW-1487] Added links to all companies officially using Airflow
Repository: incubator-airflow Updated Branches: refs/heads/master 111ce574c -> f5c845739 [AIRFLOW-1487] Added links to all companies officially using Airflow Closes #2458 from tanmaythakur/patch-1 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f5c84573 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f5c84573 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f5c84573 Branch: refs/heads/master Commit: f5c845739c8ca69364958dd696603879d87b5b58 Parents: 111ce57 Author: Tanmay <tanmaythak...@gmail.com> Authored: Mon Aug 7 11:39:10 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Aug 7 11:39:36 2017 -0700 -- README.md | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f5c84573/README.md -- diff --git a/README.md b/README.md index a6a048d..3c85942 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ Currently **officially** using Airflow: 1. [Bellhops](https://github.com/bellhops) 1. [BlaBlaCar](https://www.blablacar.com) [[@puckel](https://github.com/puckel) & [@wmorin](https://github.com/wmorin)] 1. [Bloc](https://www.bloc.io) [[@dpaola2](https://github.com/dpaola2)] -1. BlueApron [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)] +1. [BlueApron](https://www.blueapron.com) [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)] 1. [Blue Yonder](http://www.blue-yonder.com) [[@blue-yonder](https://github.com/blue-yonder)] 1. [Celect](http://www.celect.com) [[@superdosh](https://github.com/superdosh) & [@chadcelect](https://github.com/chadcelect)] 1. [Change.org](https://www.change.org) [[@change](https://github.com/change), [@vijaykramesh](https://github.com/vijaykramesh)] @@ -103,7 +103,7 @@ Currently **officially** using Airflow: 1. [City of San Diego](http://sandiego.gov) [[@MrMaksimize](https://github.com/mrmaksimize), [@andrell81](https://github.com/andrell81) & [@arnaudvedy](https://github.com/arnaudvedy)] 1. [Clairvoyant](https://clairvoyantsoft.com) [@shekharv](https://github.com/shekharv) 1. [Clover Health](https://www.cloverhealth.com) [[@gwax](https://github.com/gwax) & [@vansivallab](https://github.com/vansivallab)] -1. Chartboost [[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)] +1. [Chartboost](https://www.chartboost.com) [[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)] 1. [Cotap](https://github.com/cotap/) [[@maraca](https://github.com/maraca) & [@richardchew](https://github.com/richardchew)] 1. [Credit Karma](https://www.creditkarma.com/) [[@preete-dixit-ck](https://github.com/preete-dixit-ck) & [@harish-gaggar-ck](https://github.com/harish-gaggar-ck) & [@greg-finley-ck](https://github.com/greg-finley-ck)] 1. [Digital First Media](http://www.digitalfirstmedia.com/) [[@duffn](https://github.com/duffn) & [@mschmo](https://github.com/mschmo) & [@seanmuth](https://github.com/seanmuth)] @@ -164,7 +164,7 @@ Currently **officially** using Airflow: 1. [SmartNews](https://www.smartnews.com/) [[@takus](https://github.com/takus)] 1. [Spotify](https://github.com/spotify) [[@znichols](https://github.com/znichols)] 1. [Stackspace](https://beta.stackspace.io/) -1. Stripe [[@jbalogh](https://github.com/jbalogh)] +1. [Stripe](https://stripe.com) [[@jbalogh](https://github.com/jbalogh)] 1. [Tails.com](https://tails.com/) [[@alanmcruickshank](https://github.com/alanmcruickshank)] 1. [Thumbtack](https://www.thumbtack.com/) [[@natekupp](https://github.com/natekupp)] 1. [Tictail](https://tictail.com/) @@ -176,9 +176,9 @@ Currently **officially** using Airflow: 1. [WeTransfer](https://github.com/WeTransfer) [[@jochem](https://github.com/jochem)] 1. [Whistle Labs](http://www.whistle.com) [[@ananya77041](https://github.com/ananya77041)] 1. [WiseBanyan](https://wisebanyan.com/) -1. Wooga -1. Xoom [[@gepser](https://github.com/gepser) & [@omarvides](https://github.com/omarvides)] -1. Yahoo! +1. [Wooga](https://www.wooga.com/) +1. [Xoom](https://www.xoom.com/india/send-money) [[@gepser](https://github.com/gepser) & [@omarvides](https://github.com/omarvides)] +1. [Yahoo!](https://www.yahoo.com/) 1. [Zapier](https://www.zapier.com) [[@drknexus](https://github.com/drknexus) & [@statwonk](https://github.com/statwonk)] 1. [Zendesk](https://www.github.com/zendesk) 1. [Zenly](https://zen.ly) [[@cerisier](https://github.com/cerisier) & [@jbdalido](https://github.com/jbdalido)]
incubator-airflow git commit: [AIRFLOW-1349] Fix backfill to respect limits
Repository: incubator-airflow Updated Branches: refs/heads/master 651e6063d -> ddc502694 [AIRFLOW-1349] Fix backfill to respect limits Before, if a backfill job was triggered that would include a dag run already in a RUNNING state, the dag run within the backfill would be included in the count agains the max_active_runs limit. Also, if a backfill job generated multiple dag runs it could potentially violate max_active_runs limits by executing all dag runs. Now the limit is checked per dag run to be created, and the backfill job will only run the dag runs within the backfill job that could be included within the limits. Also, if the max_active_runs limit has already been reached, the BackfillJob will wait and loop trying to create the required dag runs as soon as a dag run slot within the limit is available until all dag runs are completed. These changes provide a more consistent behavior according to the max_active_runs limits definition and allows the user to run backfill jobs with existing RUNNING state when already considered within the limits. Closes #2454 from edgarRd/erod-fix-backfill-max Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ddc50269 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ddc50269 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ddc50269 Branch: refs/heads/master Commit: ddc50269431d8715e0eaeed7be5f522fed5521da Parents: 651e606 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Fri Aug 4 14:08:17 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Aug 4 14:08:20 2017 -0700 -- airflow/bin/cli.py | 13 - airflow/jobs.py| 144 +--- airflow/models.py | 51 - tests/jobs.py | 140 +- 4 files changed, 271 insertions(+), 77 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index dc49bb7..077cb90 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -179,7 +179,8 @@ def backfill(args, dag=None): conf.getboolean('core', 'donot_pickle')), ignore_first_depends_on_past=args.ignore_first_depends_on_past, ignore_task_deps=args.ignore_dependencies, -pool=args.pool) +pool=args.pool, +delay_on_limit_secs=args.delay_on_limit) def trigger_dag(args): @@ -1234,6 +1235,14 @@ class CLIFactory(object): "DO respect depends_on_past)."), "store_true"), 'pool': Arg(("--pool",), "Resource pool to use"), +'delay_on_limit': Arg( +("--delay_on_limit",), +help=("Amount of time in seconds to wait when the limit " + "on maximum active dag runs (max_active_runs) has " + "been reached before trying to execute a dag run " + "again."), +type=float, +default=1.0), # list_tasks 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), # list_dags @@ -1491,7 +1500,7 @@ class CLIFactory(object): 'dag_id', 'task_regex', 'start_date', 'end_date', 'mark_success', 'local', 'donot_pickle', 'include_adhoc', 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', -'subdir', 'pool', 'dry_run') +'subdir', 'pool', 'delay_on_limit', 'dry_run') }, { 'func': list_tasks, 'help': "List the tasks within a DAG", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 668973e..d94a0e0 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1837,26 +1837,29 @@ class BackfillJob(BaseJob): not_ready=None, deadlocked=None, active_runs=None, + executed_dag_run_dates=None, finished_runs=0, total_runs=0, ): """ :param to_run: Tasks to run in the backfill -:type to_run: dict +:type to_run: dict[Tuple[String, String, DateTime], TaskInstance] :param started: Maps started task instance key to task instance object
[jira] [Commented] (AIRFLOW-774) dagbag_size/collect_dags/dagbag_import_errors stats incorrect
[ https://issues.apache.org/jira/browse/AIRFLOW-774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111825#comment-16111825 ] Dan Davydov commented on AIRFLOW-774: - Yes to master, IIRC they should appear starting from release 1.8.1. This is the JIRA: https://issues.apache.org/jira/browse/AIRFLOW-780 Yep only fixed the import errors, not the stats. The changes should be similar though (pull the stats logic out to the top-level which aggregates each individual parsing subprocesses errors). > dagbag_size/collect_dags/dagbag_import_errors stats incorrect > - > > Key: AIRFLOW-774 > URL: https://issues.apache.org/jira/browse/AIRFLOW-774 > Project: Apache Airflow > Issue Type: Bug > Components: logging > Reporter: Dan Davydov > > After the multiprocessor change was made (dag folders are processed in > parallel), the number of dags reported by airflow is for each of these > subprocesses which is inaccurate, and potentially orders of magnitude less > than the actual number of dags. These individual processes stats should be > aggregated. The collect_dags/dagbag_import_errors stats should also be fixed > (time it takes to parse the dags). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1445] Changing HivePartitionSensor UI color to lighter shade
Repository: incubator-airflow Updated Branches: refs/heads/master 1932ccc88 -> 836f2899c [AIRFLOW-1445] Changing HivePartitionSensor UI color to lighter shade My PR is simply to improve the readability of the text using the HivePartitionSensor. The screen shots below show the before and after. The darker shade (nearly black) is the before, and the purple color is the after. Closes #2476 from Acehaidrey/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/836f2899 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/836f2899 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/836f2899 Branch: refs/heads/master Commit: 836f2899c6fd225fa8da5b7c697a470f2d2f9d58 Parents: 1932ccc Author: Ace Haidrey <ahaid...@pandora.com> Authored: Tue Aug 1 09:20:36 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Aug 1 09:20:39 2017 -0700 -- airflow/operators/sensors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/836f2899/airflow/operators/sensors.py -- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index bfa2ef4..409c18d 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -347,7 +347,7 @@ class HivePartitionSensor(BaseSensorOperator): :type metastore_conn_id: str """ template_fields = ('schema', 'table', 'partition',) -ui_color = '#2b2d42' +ui_color = '#C5CAE9' @apply_defaults def __init__(
incubator-airflow git commit: [AIRFLOW-1349] Refactor BackfillJob _execute
Repository: incubator-airflow Updated Branches: refs/heads/master 322ec9609 -> 547f8184b [AIRFLOW-1349] Refactor BackfillJob _execute BackfillJob._execute is doing multiple things - it is pretty hard to follow and maintain. Changes included are just a re-org of the code, no logic has been changed. Refactor includes: - Break BackfillJob._execute into functions - Add a Status object to track BackfillJob internal status while executing the job. Closes #2463 from edgarRd/erod-backfill-refactor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/547f8184 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/547f8184 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/547f8184 Branch: refs/heads/master Commit: 547f8184be1e0b3f902faf354fa39579d2f08af2 Parents: 322ec96 Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com> Authored: Fri Jul 28 15:49:50 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Jul 28 15:50:24 2017 -0700 -- airflow/jobs.py | 515 - airflow/models.py | 31 +++ tests/jobs.py | 189 +- 3 files changed, 501 insertions(+), 234 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/547f8184/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index e2f8c94..668973e 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1817,6 +1817,62 @@ class BackfillJob(BaseJob): 'polymorphic_identity': 'BackfillJob' } +class _DagRunTaskStatus(object): +""" +Internal status of the backfill job. This class is intended to be instantiated +only within a BackfillJob instance and will track the execution of tasks, +e.g. started, skipped, succeeded, failed, etc. Information about the dag runs +related to the backfill job are also being tracked in this structure, +.e.g finished runs, etc. Any other status related information related to the +execution of dag runs / tasks can be included in this structure since it makes +it easier to pass it around. +""" +# TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts +def __init__(self, + to_run=None, + started=None, + skipped=None, + succeeded=None, + failed=None, + not_ready=None, + deadlocked=None, + active_runs=None, + finished_runs=0, + total_runs=0, + ): +""" +:param to_run: Tasks to run in the backfill +:type to_run: dict +:param started: Maps started task instance key to task instance object +:type started: dict +:param skipped: Tasks that have been skipped +:type skipped: set +:param succeeded: Tasks that have succeeded so far +:type succeeded: set +:param failed: Tasks that have failed +:type failed: set +:param not_ready: Tasks not ready for execution +:type not_ready: set +:param deadlocked: Deadlocked tasks +:type deadlocked: set +:param active_runs: Active tasks at a certain point in time +:type active_runs: list +:param finished_runs: Number of finished runs so far +:type finished_runs: int +:param total_runs: Number of total dag runs able to run +:type total_runs: int +""" +self.to_run = to_run or dict() +self.started = started or dict() +self.skipped = skipped or set() +self.succeeded = succeeded or set() +self.failed = failed or set() +self.not_ready = not_ready or set() +self.deadlocked = deadlocked or set() +self.active_runs = active_runs or list() +self.finished_runs = finished_runs +self.total_runs = total_runs + def __init__( self, dag, @@ -1841,41 +1897,38 @@ class BackfillJob(BaseJob): self.pool = pool super(BackfillJob, self).__init__(*args, **kwargs) -def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run): +def _update_counters(self, ti_status): """ Updates the counters per state of the tasks that were running. Can re-add to tasks to run in case required. -
[jira] [Created] (AIRFLOW-1448) Revert PR 2433 which added merge conflicts to master
Dan Davydov created AIRFLOW-1448: Summary: Revert PR 2433 which added merge conflicts to master Key: AIRFLOW-1448 URL: https://issues.apache.org/jira/browse/AIRFLOW-1448 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Assignee: Dan Davydov https://github.com/apache/incubator-airflow/pull/2433 has logical merge conflicts in master and causes tests to fail, it needs to be reverted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1442] Remove extra space from ignore_all_deps generated command
Repository: incubator-airflow Updated Branches: refs/heads/master 3547cbffd -> aa64f370b [AIRFLOW-1442] Remove extra space from ignore_all_deps generated command Fix extra whitespace in the ignore_all_deps arg which was causing commands to fail. Closes #2468 from aoen/ddavydov-- fix_ignore_all_deps_extra_space Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/aa64f370 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/aa64f370 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/aa64f370 Branch: refs/heads/master Commit: aa64f370b28935a9fe0e692864a94fca89113bfe Parents: 3547cbf Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Fri Jul 21 14:06:29 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Jul 21 14:06:32 2017 -0700 -- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa64f370/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index c1fd4a3..d1f8e59 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -934,7 +934,7 @@ class TaskInstance(Base): cmd.extend(["--mark_success"]) if mark_success else None cmd.extend(["--pickle", str(pickle_id)]) if pickle_id else None cmd.extend(["--job_id", str(job_id)]) if job_id else None -cmd.extend(["-A "]) if ignore_all_deps else None +cmd.extend(["-A"]) if ignore_all_deps else None cmd.extend(["-i"]) if ignore_task_deps else None cmd.extend(["-I"]) if ignore_depends_on_past else None cmd.extend(["--force"]) if ignore_ti_state else None
[jira] [Created] (AIRFLOW-1442) Airflow Ignore All Deps Command Generation Has an Extra Space
Dan Davydov created AIRFLOW-1442: Summary: Airflow Ignore All Deps Command Generation Has an Extra Space Key: AIRFLOW-1442 URL: https://issues.apache.org/jira/browse/AIRFLOW-1442 Project: Apache Airflow Issue Type: Bug Reporter: Dan Davydov Assignee: Dan Davydov Airflow Ignore All Deps Command Generation Has an Extra Space. This causes airflow commands e.g. generated by the webserver to fail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [Airflow 1332] Split logs based on try number
Repository: incubator-airflow Updated Branches: refs/heads/master b9576d57b -> b49986c3b [Airflow 1332] Split logs based on try number This PR splits logs based on try number and add tabs to display different task instance tries. **Note this PR is a temporary change for separating task attempts. The code in this PR will be refactored in the future. Please refer to #2422 for Airflow logging abstractions redesign.** Testing: 1. Added unit tests. 2. Tested on localhost. 3. Tested on production environment with S3 remote storage, MySQL database, Redis, one Airflow scheduler and two airflow workers. Closes #2383 from AllisonWang/allison--add-task- attempt Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b49986c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b49986c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b49986c3 Branch: refs/heads/master Commit: b49986c3b24a5382f817d5a3fc40add87b464ba2 Parents: b9576d5 Author: AllisonWang <allisonwang...@gmail.com> Authored: Thu Jul 20 18:08:15 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jul 20 18:08:18 2017 -0700 -- airflow/bin/cli.py| 81 +-- airflow/models.py | 50 --- airflow/utils/logging.py | 66 + airflow/www/templates/airflow/ti_log.html | 40 ++ airflow/www/views.py | 180 + dags/test_dag.py | 4 +- docs/scheduler.rst| 19 ++- tests/models.py | 137 ++- tests/operators/python_operator.py| 88 +++- tests/utils/test_dates.py | 4 - tests/utils/test_logging.py | 29 11 files changed, 503 insertions(+), 195 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index f568d5d..a8543d3 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -59,7 +59,6 @@ from airflow.www.app import cached_app from sqlalchemy import func from sqlalchemy.orm import exc - api.load_auth() api_module = import_module(conf.get('cli', 'api_client')) api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'), @@ -316,7 +315,7 @@ def run(args, dag=None): # Load custom airflow config if args.cfg_path: with open(args.cfg_path, 'r') as conf_file: - conf_dict = json.load(conf_file) +conf_dict = json.load(conf_file) if os.path.exists(args.cfg_path): os.remove(args.cfg_path) @@ -327,6 +326,21 @@ def run(args, dag=None): settings.configure_vars() settings.configure_orm() +if not args.pickle and not dag: +dag = get_dag(args) +elif not dag: +session = settings.Session() +logging.info('Loading pickle id {args.pickle}'.format(args=args)) +dag_pickle = session.query( +DagPickle).filter(DagPickle.id == args.pickle).first() +if not dag_pickle: +raise AirflowException("Who hid the pickle!? [missing pickle]") +dag = dag_pickle.pickle + +task = dag.get_task(task_id=args.task_id) +ti = TaskInstance(task, args.execution_date) +ti.refresh_from_db() + logging.root.handlers = [] if args.raw: # Output to STDOUT for the parent process to read and log @@ -350,19 +364,23 @@ def run(args, dag=None): # writable by both users, then it's possible that re-running a task # via the UI (or vice versa) results in a permission error as the task # tries to write to a log file created by the other user. +try_number = ti.try_number log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) -directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) +log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id, + args.execution_date) +directory = os.path.join(log_base, log_relative_dir) # Create the log file and give it group writable permissions # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag # operator is not compatible with impersonation (e.g. if a Celery executor is used # for a SubDag operator and the SubDag operator has a different owner than the # parent DAG) -if not os.path.exists(directory): +if not os.path.isdir(directory):
incubator-airflow git commit: Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"
Repository: incubator-airflow Updated Branches: refs/heads/master e6ef06c53 -> b9576d57b Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging" This reverts commit e6ef06c53fd4449db6e665cce5cad9418dde232f which was committed accidentally. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9576d57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9576d57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9576d57 Branch: refs/heads/master Commit: b9576d57b6063908e488654f0b21b338c10069fd Parents: e6ef06c Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Thu Jul 20 18:07:17 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jul 20 18:07:17 2017 -0700 -- airflow/bin/cli.py | 111 ++--- airflow/config_templates/__init__.py| 13 -- airflow/config_templates/default_airflow.cfg| 6 - .../config_templates/default_airflow_logging.py | 73 - airflow/settings.py | 13 +- airflow/task_runner/base_task_runner.py | 1 - airflow/utils/log/__init__.py | 13 -- airflow/utils/log/file_task_handler.py | 158 --- airflow/utils/log/s3_task_handler.py| 90 --- airflow/utils/logging.py| 11 -- airflow/www/views.py| 104 +--- 11 files changed, 177 insertions(+), 416 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 11f415a..f568d5d 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -22,6 +22,7 @@ import os import socket import subprocess import textwrap +import warnings from importlib import import_module import argparse @@ -51,6 +52,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance, Connection) from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils +from airflow.utils import logging as logging_utils +from airflow.utils.file import mkdirs from airflow.www.app import cached_app from sqlalchemy import func @@ -324,6 +327,55 @@ def run(args, dag=None): settings.configure_vars() settings.configure_orm() +logging.root.handlers = [] +if args.raw: +# Output to STDOUT for the parent process to read and log +logging.basicConfig( +stream=sys.stdout, +level=settings.LOGGING_LEVEL, +format=settings.LOG_FORMAT) +else: +# Setting up logging to a file. + +# To handle log writing when tasks are impersonated, the log files need to +# be writable by the user that runs the Airflow command and the user +# that is impersonated. This is mainly to handle corner cases with the +# SubDagOperator. When the SubDagOperator is run, all of the operators +# run under the impersonated user and create appropriate log files +# as the impersonated user. However, if the user manually runs tasks +# of the SubDagOperator through the UI, then the log files are created +# by the user that runs the Airflow command. For example, the Airflow +# run command may be run by the `airflow_sudoable` user, but the Airflow +# tasks may be run by the `airflow` user. If the log files are not +# writable by both users, then it's possible that re-running a task +# via the UI (or vice versa) results in a permission error as the task +# tries to write to a log file created by the other user. +log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) +directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) +# Create the log file and give it group writable permissions +# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag +# operator is not compatible with impersonation (e.g. if a Celery executor is used +# for a SubDag operator and the SubDag operator has a different owner than the +# parent DAG) +if not os.path.exists(directory): +# Create the directory as globally writable using custom mkdirs +# as os.makedirs doesn't set mode properly. +mkdirs(directory, 0o775) +iso = args.execution_date.isoformat() +filename = "{directory}/{iso}".format(**locals()) + +if not os.path.exists(filename): +open(filename, "
incubator-airflow git commit: [AIRFLOW-1385] Create abstraction for Airflow task logging
Repository: incubator-airflow Updated Branches: refs/heads/master 392772326 -> e6ef06c53 [AIRFLOW-1385] Create abstraction for Airflow task logging This PR adds abilities to provide customized implementations of airflow task logging. It creates an abstraction for setting up, cleaning up and get task instance logs. This change is primarily a refactor of logging logic. It is tested locally with custom logging implementations. Closes #2422 from AllisonWang/allison--log-handler Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e6ef06c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e6ef06c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e6ef06c5 Branch: refs/heads/master Commit: e6ef06c53fd4449db6e665cce5cad9418dde232f Parents: 3927723 Author: AllisonWang <allisonwang...@gmail.com> Authored: Thu Jul 20 18:03:23 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jul 20 18:03:26 2017 -0700 -- airflow/bin/cli.py | 111 +++-- airflow/config_templates/__init__.py| 13 ++ airflow/config_templates/default_airflow.cfg| 6 + .../config_templates/default_airflow_logging.py | 73 + airflow/settings.py | 13 +- airflow/task_runner/base_task_runner.py | 1 + airflow/utils/log/__init__.py | 13 ++ airflow/utils/log/file_task_handler.py | 158 +++ airflow/utils/log/s3_task_handler.py| 90 +++ airflow/utils/logging.py| 11 ++ airflow/www/views.py| 104 +++- 11 files changed, 416 insertions(+), 177 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index f568d5d..11f415a 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -22,7 +22,6 @@ import os import socket import subprocess import textwrap -import warnings from importlib import import_module import argparse @@ -52,8 +51,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance, Connection) from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils -from airflow.utils import logging as logging_utils -from airflow.utils.file import mkdirs from airflow.www.app import cached_app from sqlalchemy import func @@ -327,55 +324,6 @@ def run(args, dag=None): settings.configure_vars() settings.configure_orm() -logging.root.handlers = [] -if args.raw: -# Output to STDOUT for the parent process to read and log -logging.basicConfig( -stream=sys.stdout, -level=settings.LOGGING_LEVEL, -format=settings.LOG_FORMAT) -else: -# Setting up logging to a file. - -# To handle log writing when tasks are impersonated, the log files need to -# be writable by the user that runs the Airflow command and the user -# that is impersonated. This is mainly to handle corner cases with the -# SubDagOperator. When the SubDagOperator is run, all of the operators -# run under the impersonated user and create appropriate log files -# as the impersonated user. However, if the user manually runs tasks -# of the SubDagOperator through the UI, then the log files are created -# by the user that runs the Airflow command. For example, the Airflow -# run command may be run by the `airflow_sudoable` user, but the Airflow -# tasks may be run by the `airflow` user. If the log files are not -# writable by both users, then it's possible that re-running a task -# via the UI (or vice versa) results in a permission error as the task -# tries to write to a log file created by the other user. -log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) -directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) -# Create the log file and give it group writable permissions -# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag -# operator is not compatible with impersonation (e.g. if a Celery executor is used -# for a SubDag operator and the SubDag operator has a different owner than the -# parent DAG) -if not os.path.exists(directory): -# Create the directory as globally writable using custom mkdirs -# as os.makedirs doesn't set mode properly. -mkdirs(direc
[jira] [Created] (AIRFLOW-1432) NVD3 Charts do not have labeled axes and units change dynamically
Dan Davydov created AIRFLOW-1432: Summary: NVD3 Charts do not have labeled axes and units change dynamically Key: AIRFLOW-1432 URL: https://issues.apache.org/jira/browse/AIRFLOW-1432 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov E.g. for the landing times chart, the y axis isn't labeled (e.g. minutes/hours/days), and changes dynamically based on the data points in the chart/the pixel height of the chart. The y axis should be labeled in these charts. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1247] Fix ignore all dependencies argument ignored
Repository: incubator-airflow Updated Branches: refs/heads/master 4322d6dae -> e88ecff6a [AIRFLOW-1247] Fix ignore all dependencies argument ignored Fix typo in ignore_all_dependencies argument to fix it. Closes #2441 from aoen/ddavydov-- fix_ignore_all_deps_cli Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e88ecff6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e88ecff6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e88ecff6 Branch: refs/heads/master Commit: e88ecff6ac71758d763dd5037b177e7a2fbc9896 Parents: 4322d6d Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Thu Jul 13 15:38:00 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jul 13 15:38:03 2017 -0700 -- airflow/bin/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e88ecff6/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 4b3a0ed..f568d5d 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1307,7 +1307,7 @@ class CLIFactory(object): 'ignore_all_dependencies': Arg( ("-A", "--ignore_all_dependencies"), "Ignores all non-critical dependencies, including ignore_ti_state and " -"ignore_task_deps" +"ignore_task_deps", "store_true"), # TODO(aoen): ignore_dependencies is a poor choice of name here because it is too # vague (e.g. a task being in the appropriate state to be run is also a dependency
[jira] [Assigned] (AIRFLOW-1247) CLI: ignore all dependencies argument ignored
[ https://issues.apache.org/jira/browse/AIRFLOW-1247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov reassigned AIRFLOW-1247: Assignee: Dan Davydov (was: Matti Remes) > CLI: ignore all dependencies argument ignored > - > > Key: AIRFLOW-1247 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1247 > Project: Apache Airflow > Issue Type: Bug > Components: cli >Affects Versions: 1.8.1 >Reporter: Matti Remes >Assignee: Dan Davydov >Priority: Trivial > Fix For: 1.9.0 > > > Missing comma in Arg object creation -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1366] Add max_tries to task instance
Repository: incubator-airflow Updated Branches: refs/heads/master b532d8d77 -> 4f20f6077 [AIRFLOW-1366] Add max_tries to task instance Right now Airflow deletes the task instance when user clear it. We have no way of keeping track of how many times a task instance gets run either via user or itself. So instead of deleting the task instance record, we should keep the task instance and make try_number monotonically increasing for every task instance attempt. max_tries is introduced as an upper bound for retrying tasks by task itself. This new column will be used to update logic behind clear_task_instances. db migration is tested locally. Closes #2409 from AllisonWang/allison--max-tries Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4f20f607 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4f20f607 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4f20f607 Branch: refs/heads/master Commit: 4f20f607764bb3477419321b5dfd0c53ba1db3c0 Parents: b532d8d Author: AllisonWang <allisonwang...@gmail.com> Authored: Mon Jul 10 15:26:08 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Jul 10 15:26:12 2017 -0700 -- ...dc7_add_max_tries_column_to_task_instance.py | 106 +++ airflow/models.py | 3 + 2 files changed, 109 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f20f607/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py -- diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py new file mode 100644 index 000..2d5ffc2 --- /dev/null +++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py @@ -0,0 +1,106 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add max tries column to task instance + +Revision ID: cc1e65623dc7 +Revises: 127d2bf2dfa7 +Create Date: 2017-06-19 16:53:12.851141 + +""" + +# revision identifiers, used by Alembic. +revision = 'cc1e65623dc7' +down_revision = '127d2bf2dfa7' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from airflow import settings +from airflow.models import DagBag, TaskInstance + +BATCH_SIZE = 5000 + +def upgrade(): +op.add_column('task_instance', sa.Column('max_tries', sa.Integer, +server_default="-1")) +# Check if table task_instance exist before data migration. This check is +# needed for database that does not create table until migration finishes. +# Checking task_instance table exists prevent the error of querying +# non-existing task_instance table. +engine = settings.engine +if engine.dialect.has_table(engine, 'task_instance'): +# Get current session +connection = op.get_bind() +sessionmaker = sa.orm.sessionmaker() +session = sessionmaker(bind=connection) +dagbag = DagBag(settings.DAGS_FOLDER) +query = session.query(sa.func.count(TaskInstance.max_tries)).filter( +TaskInstance.max_tries == -1 +) +# Separate db query in batch to prevent loading entire table +# into memory and cause out of memory error. +while query.scalar(): +tis = session.query(TaskInstance).filter( +TaskInstance.max_tries == -1 +).limit(BATCH_SIZE).all() +for ti in tis: +dag = dagbag.get_dag(ti.dag_id) +if not dag or not dag.has_task(ti.task_id): +# task_instance table might not have the up-to-date +# information, i.e dag or task might be modified or +# deleted in dagbag but is reflected in task instance +# table. In this case we do not retry the task that can't +# be parsed. +ti.max_tries = ti.try_number +else: +task = dag.get_task(ti.task_id) +ti.max_tries =
[jira] [Created] (AIRFLOW-1351) airflow cli commands should log error/debug to stderr
Dan Davydov created AIRFLOW-1351: Summary: airflow cli commands should log error/debug to stderr Key: AIRFLOW-1351 URL: https://issues.apache.org/jira/browse/AIRFLOW-1351 Project: Apache Airflow Issue Type: Bug Components: cli Reporter: Dan Davydov Commands like airflow list_task are printing messages like: {code}[2017-06-27 20:36:57,116] {models.py:373} DEBUG - Loaded DAG {code} to stdout when the only thing being printed to stdout should be the listed dag ids. This makes it hard for users to parse the output of these commands. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs
Dan Davydov created AIRFLOW-1311: Summary: Improve Webserver Load Time For Large DAGs Key: AIRFLOW-1311 URL: https://issues.apache.org/jira/browse/AIRFLOW-1311 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov Large DAGs can take an extremely long time to load in the Airflow UI (minutes/timeout). The fixes are as follows: 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing tasks by their depth, and allow users to expand sections for these DAGs, ideally prefetch deeper tasks once the initial set of tasks has rendered ) 2. Identify bottlenecks/performance issues in both the frontend/backend for rendering DAGs on the webserver and fix them. Airflow should be more performant for displaying DAGs that are somewhat large, e.g. DAGs that have up to 500 nodes and 2000 edges (dependencies from one task to another) should render within a couple of seconds. 3. Make DAG loading asynchronous in the UI (once the top-level tasks have loaded display them immediately). We might not want to do this as users might try to click something only to have the UI change from underneath them [~saguziel] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-936] Add clear/mark success for DAG in the UI
Repository: incubator-airflow Updated Branches: refs/heads/master 6e3bcd318 -> 3c450fbe1 [AIRFLOW-936] Add clear/mark success for DAG in the UI This PR adds a modal popup when clicking circle DAG icon in Airflow tree view UI. It adds the functionalities to clear/mark success of the entire DAG run. This behavior is equivalent to individually clear/mark each task instance in the DAG run. The original logic of editing DAG run page is moved to the button "Edit DAG Run". Closes #2339 from AllisonWang/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c450fbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c450fbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c450fbe Branch: refs/heads/master Commit: 3c450fbe1abad7b76b59b6b3b15c6e29b4ad8d0f Parents: 6e3bcd3 Author: Allison Wang <allisonwang...@gmail.com> Authored: Tue Jun 13 18:56:41 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Jun 13 18:56:44 2017 -0700 -- airflow/api/common/experimental/mark_tasks.py | 33 +++- airflow/www/templates/airflow/dag.html| 60 +++ airflow/www/templates/airflow/tree.html | 11 +- airflow/www/views.py | 113 +--- tests/api/common/mark_tasks.py| 189 - 5 files changed, 372 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/api/common/experimental/mark_tasks.py -- diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 0ddbf98..82eb4b5 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -22,7 +22,6 @@ from airflow.utils.state import State from sqlalchemy import or_ - def _create_dagruns(dag, execution_dates, state, run_id_template): """ Infers from the dates which dag runs need to be created and does so. @@ -181,7 +180,39 @@ def set_state(task, execution_date, upstream=False, downstream=False, if len(sub_dag_ids) > 0: tis_altered += qry_sub_dag.all() +session.expunge_all() session.close() return tis_altered +def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False): +""" +Set the state of a dag run and all task instances associated with the dag +run for a specific execution date. +:param dag: the DAG of which to alter state +:param execution_date: the execution date from which to start looking +:param state: the state to which the DAG need to be set +:param commit: commit DAG and tasks to be altered to the database +:return: list of tasks that have been created and updated +:raises: AssertionError if dag or execution_date is invalid +""" +res = [] + +if not dag or not execution_date: +return res + +# Mark all task instances in the dag run +for task in dag.tasks: +task.dag = dag +new_state = set_state(task=task, execution_date=execution_date, + state=state, commit=commit) +res.extend(new_state) + +# Mark the dag run +if commit: +drs = DagRun.find(dag.dag_id, execution_date=execution_date) +for dr in drs: +dr.dag = dag +dr.update_state() + +return res http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/templates/airflow/dag.html -- diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index e5a305c..706ed32 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -216,6 +216,36 @@ + + + + + + + + + + + +Edit + + +Clear + + +Mark Success + + + + +Close + + + + + {% endblock %} {% block tail %} {{ lib.form_js() }} @@ -239,6 +269,7 @@ function updateQueryStringParameter(uri, key, value) { $('.never_active').removeClass('active'); }); +var id = ''; var dag_id = '{{ dag.dag_id }}'; var task_id = ''; var exection_date = ''; @@ -263,6 +294,14 @@ function updateQueryStringParameter(uri, key, value) { } } +function call_modal_dag(dag) { + id = dag && dag.id; + execution_date =
[jira] [Resolved] (AIRFLOW-1024) Handle CeleryExecutor errors gracefully
[ https://issues.apache.org/jira/browse/AIRFLOW-1024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov resolved AIRFLOW-1024. -- Resolution: Fixed > Handle CeleryExecutor errors gracefully > --- > > Key: AIRFLOW-1024 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1024 > Project: Apache Airflow > Issue Type: Bug > Reporter: Dan Davydov >Priority: Critical > Fix For: 1.8.1 > > > If the Airflow celery executor receives a bad response from a worker (e.g. > unpickled response), then it will crash the scheduler and cause it to restart. > We should code defensively around the interactions with celery so that we > just log errors instead of crashing the scheduler. > It might makes sense to make the try catches one level higher (to catch > errors from all executors), but this needs some investigation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1288) Bad owners field in DAGs breaks Airflow front page
[ https://issues.apache.org/jira/browse/AIRFLOW-1288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Davydov updated AIRFLOW-1288: - Description: DAGs that have owners set to a bad value break the front page of the webserver with an error like below. Instead these should just cause import errors for the specific dags in question. {code} Ooops. / ( () ) \___ /( ( ( ) _)) ) )\ (( ( )() ) ( ) ) ((/ ( _( ) ( _) ) ( () ) ) ( ( ( (_) ((( ) .((_ ) . )_ ( ( )( ( )) ) . ) ( ) ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) ( ( ( ) ( ) ( )) ) _)( ) ) ) ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) ( ( ( ( (_ ( ) ( _) ) ( ) ) ) ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) (( ( )(( _) _) _(_ ( (_ ) (_((__(_(__(( ( ( | ) ) ) )_))__))_)___) ((__)\\||lll|l||/// \_)) ( /(/ ( ) ) )\ ) (( ( ( | | ) ) )\ ) ( /(| / ( )) ) ) )) ) ( ( _(|)_) ) ( ||\(|(|)|/|| ) (|(||(||)) ( //|/l|||)|\\ \ ) (/ / // /|//\\ \ \ \ _) --- Node: i-0dbbddfb63fb2cfbc.inst.aws.airbnb.com --- Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in wsgi_app response = self.full_dispatch_request() File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in full_dispatch_request rv = self.handle_user_exception(e) File "/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", line 103, in _nr_wrapper_Flask_handle_exception_ return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in handle_user_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in full_dispatch_request rv = self.dispatch_request() File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", line 40, in _nr_wrapper_handler_ return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, in inner return self._run_view(f, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 367, in _run_view return fn(self, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 758, in decorated_view return func(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 1909, in index all_dag_ids=all_dag_ids) File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 307, in render return render_template(template, **kwargs) File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", line 110, in literal_wrapper return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 128, in render_template context, ctx.app) File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 110, in _render rv = template.render(context) File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", line 98, in dynamic_wrapper return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 989, in render return self.environment.handle_exception(exc_info, True) File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 754, in handle_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python2.7/dist-packages/airflow/www/templates/airflow/dags.html", line 18, in top-level template code {% extends "airflow/master.html" %} File "/usr/local/lib/python2.7/dist-packages/airflow/www/templates/airflow/master.html", line 18, in top-level template code {% extends "admin/master.html" %} File "/usr/local/lib/
[jira] [Created] (AIRFLOW-1288) Bad owners field in DAGs breaks Airflow front page
Dan Davydov created AIRFLOW-1288: Summary: Bad owners field in DAGs breaks Airflow front page Key: AIRFLOW-1288 URL: https://issues.apache.org/jira/browse/AIRFLOW-1288 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov DAGs that have owners set to a bad value break the front page of the webserver with an error like below. Instead these should just cause import errors for the specific dags in question. {{code}} Ooops. / ( () ) \___ /( ( ( ) _)) ) )\ (( ( )() ) ( ) ) ((/ ( _( ) ( _) ) ( () ) ) ( ( ( (_) ((( ) .((_ ) . )_ ( ( )( ( )) ) . ) ( ) ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) ( ( ( ) ( ) ( )) ) _)( ) ) ) ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) ( ( ( ( (_ ( ) ( _) ) ( ) ) ) ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) (( ( )(( _) _) _(_ ( (_ ) (_((__(_(__(( ( ( | ) ) ) )_))__))_)___) ((__)\\||lll|l||/// \_)) ( /(/ ( ) ) )\ ) (( ( ( | | ) ) )\ ) ( /(| / ( )) ) ) )) ) ( ( _(|)_) ) ( ||\(|(|)|/|| ) (|(||(||)) ( //|/l|||)|\\ \ ) (/ / // /|//\\ \ \ \ _) --- Node: i-0dbbddfb63fb2cfbc.inst.aws.airbnb.com --- Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in wsgi_app response = self.full_dispatch_request() File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in full_dispatch_request rv = self.handle_user_exception(e) File "/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", line 103, in _nr_wrapper_Flask_handle_exception_ return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in handle_user_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in full_dispatch_request rv = self.dispatch_request() File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", line 40, in _nr_wrapper_handler_ return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, in inner return self._run_view(f, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 367, in _run_view return fn(self, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 758, in decorated_view return func(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 1909, in index all_dag_ids=all_dag_ids) File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 307, in render return render_template(template, **kwargs) File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", line 110, in literal_wrapper return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 128, in render_template context, ctx.app) File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 110, in _render rv = template.render(context) File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", line 98, in dynamic_wrapper return wrapped(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 989, in render return self.environment.handle_exception(exc_info, True) File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 754, in handle_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python2.7/dist-packages/airflow/www/templates/airflow/dags.html", line 18, in top-level template code {% extends "airflow/master.html" %} File "/usr/local/lib/python2.7/dis
[jira] [Created] (AIRFLOW-1280) Gantt Chart Height Isn't Set Properly
Dan Davydov created AIRFLOW-1280: Summary: Gantt Chart Height Isn't Set Properly Key: AIRFLOW-1280 URL: https://issues.apache.org/jira/browse/AIRFLOW-1280 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov The Gantt view has a dynamic height for the chart body based on the number of tasks but it's only a heuristic. Instead we should calculate the height of the chart legend and make the height of the chart equal to the height of the legend + desired height for the chart body. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1263] Dynamic height for charts
Repository: incubator-airflow Updated Branches: refs/heads/master 53ad99106 -> e3e6aa719 [AIRFLOW-1263] Dynamic height for charts Dynamic heights for webserver charts so that longer task names fit Closes #2344 from aoen/ddavydov-- dynamic_chart_heights Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e3e6aa71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e3e6aa71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e3e6aa71 Branch: refs/heads/master Commit: e3e6aa71984f5fe2e9c36d1f177054b4457f5b34 Parents: 53ad991 Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Mon Jun 5 16:32:42 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Jun 5 16:32:46 2017 -0700 -- airflow/www/views.py | 25 - 1 file changed, 20 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e3e6aa71/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index e250111..4fd52fe 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -284,6 +284,16 @@ def should_hide_value_for_key(key_name): and conf.getboolean('admin', 'hide_sensitive_variable_fields') + +def get_chart_height(dag): +""" +TODO(aoen): See [AIRFLOW-1263] We use the number of tasks in the DAG as a heuristic to +approximate the size of generated chart (otherwise the charts are tiny and unreadable +when DAGs have a large number of tasks). Ideally nvd3 should allow for dynamic-height +charts, that is charts that take up space based on the size of the components within. +""" +return 600 + len(dag.tasks) * 10 + class Airflow(BaseView): def is_visible(self): @@ -1411,10 +1421,12 @@ class Airflow(BaseView): include_upstream=True, include_downstream=False) + +chart_height = get_chart_height(dag) chart = nvd3.lineChart( -name="lineChart", x_is_date=True, height=600, width="1200") +name="lineChart", x_is_date=True, height=chart_height, width="1200") cum_chart = nvd3.lineChart( -name="cumLineChart", x_is_date=True, height=600, width="1200") +name="cumLineChart", x_is_date=True, height=chart_height, width="1200") y = defaultdict(list) x = defaultdict(list) @@ -1516,8 +1528,10 @@ class Airflow(BaseView): include_upstream=True, include_downstream=False) +chart_height = get_chart_height(dag) chart = nvd3.lineChart( -name="lineChart", x_is_date=True, y_axis_format='d', height=600, width="1200") +name="lineChart", x_is_date=True, y_axis_format='d', height=chart_height, +width="1200") for task in dag.tasks: y = [] @@ -1578,8 +1592,9 @@ class Airflow(BaseView): include_upstream=True, include_downstream=False) +chart_height = get_chart_height(dag) chart = nvd3.lineChart( -name="lineChart", x_is_date=True, height=600, width="1200") +name="lineChart", x_is_date=True, height=chart_height, width="1200") y = {} x = {} for task in dag.tasks: @@ -1622,7 +1637,7 @@ class Airflow(BaseView): 'airflow/chart.html', dag=dag, chart=chart.htmlcontent, -height="700px", +height=str(chart_height + 100) + "px", demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root, form=form,