[jira] [Created] (AIRFLOW-3542) next_ds semantics broken for manually triggered runs

2018-12-19 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-3542:


 Summary: next_ds semantics broken for manually triggered runs
 Key: AIRFLOW-3542
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3542
 Project: Apache Airflow
  Issue Type: Bug
  Components: scheduler
Affects Versions: 1.10.2
Reporter: Dan Davydov
Assignee: Dan Davydov


{color:#22}next_ds{color}{color:#22} is useful when you need cron-style 
scheduling, e.g. a task that runs for date "X" uses that date for its logic, 
e.g. send an email to users saying the run that was supposed to run for date 
"X" has completed. The problem is it doesn't behave as expected when it comes 
to manually triggered runs as illustrated by the diagrams below.{color}
 
Using execution_date in a task
*Scheduled Run (works as expected)*
execution_date1           start_date1
\/                                  \/
 *|-|*
/\                                  /\
 \_/
   scheduling_interval
 
*Manual Run* *(works as expected)*

triggered_date + execution_date + start_date
\/
*|*
 
Using next_ds in a Task
*Scheduled Run (works as expected)*
next_ds1 + start_date1           next_ds2 + start_date2
\/                                                         \/
 *||*
/\                                                         /\
 \/
             scheduling_interval
 
*Manual Run* *(next_ds1 is expected to match triggered_date as in the case for 
the manually triggered run that uses the regular execution_date above)*
triggered_date                                    next_ds1 + start_date
\/                                                         \/
*|-|*
/\                                                         /\
 \/
             0 to scheduling_interval (depending on when the next execution 
date is)
Proposal
Have next_ds always set to execution_date for manually triggered runs instead 
of the next schedule-interval aligned execution date.
 
This _might_ break backwards compatibility for some users but it can be argued 
that the current functionality is a bug. If it's really desired we can create 
new aliases that behave logically although I am against this.
 
prev_ds should probably also be made consistent with this logic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-3233) Dag deletion in the UI doesn't work

2018-10-19 Thread Dan Davydov (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-3233:
-
Affects Version/s: 1.10.0

> Dag deletion in the UI doesn't work
> ---
>
> Key: AIRFLOW-3233
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3233
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.10.0
>    Reporter: Dan Davydov
>Assignee: Dan Davydov
>Priority: Major
>
> Dag deletion in the UI doesn't work, DAGs can only be deleted if the DAG 
> doesn't exist in the DAGBag, but if the DAG doesn't exist in the DAGBag the 
> deletion URL gets passed an empty DAG id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3233) Dag deletion in the UI doesn't work

2018-10-19 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-3233:


 Summary: Dag deletion in the UI doesn't work
 Key: AIRFLOW-3233
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3233
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov
Assignee: Dan Davydov


Dag deletion in the UI doesn't work, DAGs can only be deleted if the DAG 
doesn't exist in the DAGBag, but if the DAG doesn't exist in the DAGBag the 
deletion URL gets passed an empty DAG id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-3191) Fix not being able to specify execution_date when creating dagrun

2018-10-11 Thread Dan Davydov (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-3191:
-
Description: 
Creating a dagrun using the flask web UI is not possible because execution_date 
is hidden. This is because the column type is actually DATETIME where Flask 
only has a converter for DateTime (it's the same type, it's a casing issue).

Fix is to add a form override: form_overrides = 
dict(execution_date=DateTimeField)

Only applies to the non-rbac webserver.

  was:
Creating a dagrun using the flask web UI is not possible because execution_date 
is hidden. This is because the column type is actually DATETIME where Flask 
only has a converter for DateTime (it's the same type, it's a casing issue).

Fix is to add a form override: form_overrides = 
dict(execution_date=DateTimeField)


> Fix not being able to specify execution_date when creating dagrun
> -
>
> Key: AIRFLOW-3191
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3191
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.10.0
>    Reporter: Dan Davydov
>Assignee: Dan Davydov
>Priority: Major
>
> Creating a dagrun using the flask web UI is not possible because 
> execution_date is hidden. This is because the column type is actually 
> DATETIME where Flask only has a converter for DateTime (it's the same type, 
> it's a casing issue).
> Fix is to add a form override: form_overrides = 
> dict(execution_date=DateTimeField)
> Only applies to the non-rbac webserver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3191) Can not specify execution_date when creating dagrun

2018-10-11 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-3191:


 Summary: Can not specify execution_date when creating dagrun
 Key: AIRFLOW-3191
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3191
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Affects Versions: 1.10.0
Reporter: Dan Davydov
Assignee: Dan Davydov


Creating a dagrun using the flask web UI is not possible because execution_date 
is hidden. This is because the column type is actually DATETIME where Flask 
only has a converter for DateTime (it's the same type, it's a casing issue).

Fix is to add a form override: form_overrides = 
dict(execution_date=DateTimeField)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-3191) Fix not being able to specify execution_date when creating dagrun

2018-10-11 Thread Dan Davydov (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-3191:
-
Summary: Fix not being able to specify execution_date when creating dagrun  
(was: Can not specify execution_date when creating dagrun)

> Fix not being able to specify execution_date when creating dagrun
> -
>
> Key: AIRFLOW-3191
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3191
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.10.0
>    Reporter: Dan Davydov
>Assignee: Dan Davydov
>Priority: Major
>
> Creating a dagrun using the flask web UI is not possible because 
> execution_date is hidden. This is because the column type is actually 
> DATETIME where Flask only has a converter for DateTime (it's the same type, 
> it's a casing issue).
> Fix is to add a form override: form_overrides = 
> dict(execution_date=DateTimeField)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-3160) Load latest_dagruns asynchronously

2018-10-05 Thread Dan Davydov (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639886#comment-16639886
 ] 

Dan Davydov commented on AIRFLOW-3160:
--

Latency is still an issue if the DB is not e.g. collocated (50 sequential DB 
calls), batching the queries at the least is definitely necessary.

> Load latest_dagruns asynchronously 
> ---
>
> Key: AIRFLOW-3160
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3160
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webserver
>Affects Versions: 1.10.0
>    Reporter: Dan Davydov
>Assignee: Dan Davydov
>Priority: Major
>
> The front page loads very slowly when the DB has latency because one blocking 
> query is made per DAG against the DB.
>  
> The latest dagruns should be loaded asynchronously and in batch like the 
> other UI elements that query the database.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3160) Load latest_dagruns asynchronously

2018-10-04 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-3160:


 Summary: Load latest_dagruns asynchronously 
 Key: AIRFLOW-3160
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3160
 Project: Apache Airflow
  Issue Type: Improvement
  Components: webserver
Affects Versions: 1.10.0
Reporter: Dan Davydov
Assignee: Dan Davydov


The front page loads very slowly when the DB has latency because one blocking 
query is made per DAG against the DB.

 

The latest dagruns should be loaded asynchronously and in batch like the other 
UI elements that query the database.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3050) Deactivate Stale DAGs in the Main Scheduler Loop

2018-09-12 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-3050:


 Summary: Deactivate Stale DAGs in the Main Scheduler Loop
 Key: AIRFLOW-3050
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3050
 Project: Apache Airflow
  Issue Type: Improvement
  Components: scheduler
Reporter: Dan Davydov
Assignee: Dan Davydov


Currently there is logic to deactivate stale DAGs in the scheduler, but it is 
only run after the scheduler completes. Since most companies run the scheduler 
in a continuous loop, this code never runs. This code should be moved into the 
main scheduler loop.

 

This would largely obviate the UI/API endpoints for deleting DAGs.

 

Mailing List Thread On This Topic:

https://lists.apache.org/thread.html/8ac996a69bfa8214e5f56f2851cea349e0d83dcbb0d9e5af010525b9@%3Cdev.airflow.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2895) Prevent scheduler from spamming heartbeats/logs

2018-08-13 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-2895:


 Summary: Prevent scheduler from spamming heartbeats/logs
 Key: AIRFLOW-2895
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2895
 Project: Apache Airflow
  Issue Type: Bug
  Components: scheduler
Reporter: Dan Davydov
Assignee: Dan Davydov


There seems to be a couple of problems with 
[https://github.com/apache/incubator-airflow/pull/2986] that cause the sleep to 
not trigger and Scheduler heartbeating/logs to be spammed:
 # If all of the files are being processed in the queue, there is no sleep (can 
be fixed by sleeping for min_sleep even if there are no files)
 # I have heard reports that some files can return a parsing time that is 
monotonically increasing for some reason (e.g. file actually parses in 1s each 
loop, but the reported duration seems to use the very time the file was parsed 
as the start time instead of the last time), I haven't confirmed this but it 
sounds problematic.

To unblock the release I'm reverting this PR for now. It should be re-added 
with tests/mocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2810] Fix typo in Xcom model timestamp

2018-07-27 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 233056e0d -> a338f3276


[AIRFLOW-2810] Fix typo in Xcom model timestamp

Fix typo in Xcom model timestamp field

No new testing - the field is already represented
in migrations

Closes #3652 from andywilco/fix_datetime_typo


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a338f327
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a338f327
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a338f327

Branch: refs/heads/master
Commit: a338f3276835af45765d24a6e6d43ad4ba4d66ba
Parents: 233056e
Author: Andy Wilcox 
Authored: Fri Jul 27 13:03:30 2018 -0400
Committer: Dan Davydov 
Committed: Fri Jul 27 13:03:36 2018 -0400

--
 airflow/models.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a338f327/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 1d832ab..cf7eb0a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4482,7 +4482,7 @@ class XCom(Base, LoggingMixin):
 key = Column(String(512))
 value = Column(LargeBinary)
 timestamp = Column(
-DateTime, default=timezone.utcnow, nullable=False)
+UtcDateTime, default=timezone.utcnow, nullable=False)
 execution_date = Column(UtcDateTime, nullable=False)
 
 # source information



[jira] [Created] (AIRFLOW-2422) Batch manage_slas queries

2018-05-04 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-2422:


 Summary: Batch manage_slas queries
 Key: AIRFLOW-2422
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2422
 Project: Apache Airflow
  Issue Type: Bug
  Components: scheduler
Reporter: Dan Davydov
Assignee: Dan Davydov


If there are too many SLA entries that need to be inserted in the scheduler 
loop, the scheduler errors out if the connection times out. The SLA queries to 
Mysql need to be batched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [Airflow-2202] Add filter support in HiveMetastoreHook().max_partition()

2018-03-16 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 133e249e0 -> 9a315efc7


[Airflow-2202] Add filter support in HiveMetastoreHook().max_partition()

Adding back support for filter in max_partition()
which could be used by some valid use cases. It
will work for tables with multiple partitions,
which is the behavior before (tho the doc stated
it only works for single partitioned table). This
change also kept the behavior when trying to get
max partition on a sub-partitioned table without
supplying filter--it will return the max partition
value of the partition key even it is not unique.

Some extra checks are added to provide more
meaningful exception messages.

Closes #3117 from yrqls21/kevin_yang_add_filter


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a315efc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a315efc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a315efc

Branch: refs/heads/master
Commit: 9a315efc797da3936ef0ee9065307b1e02cab43a
Parents: 133e249
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Fri Mar 16 18:01:48 2018 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Mar 16 18:01:54 2018 -0700

--
 airflow/hooks/hive_hooks.py   | 109 -
 airflow/macros/hive.py|  21 ---
 tests/hooks/test_hive_hook.py |  57 +++
 3 files changed, 129 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a315efc/airflow/hooks/hive_hooks.py
--
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 128be41..7b5ff10 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -614,62 +614,95 @@ class HiveMetastoreHook(BaseHook):
 return [dict(zip(pnames, p.values)) for p in parts]
 
 @staticmethod
-def _get_max_partition_from_part_names(part_names, key_name):
-"""
-Helper method to get max partition from part names. Works only
-when partition format follows '{key}={value}' and key_name is name of
-the only partition key.
-:param part_names: list of partition names
-:type part_names: list
-:param key_name: partition key name
-:type key_name: str
-:return: Max partition or None if part_names is empty.
-"""
-if not part_names:
+def _get_max_partition_from_part_specs(part_specs, partition_key, 
filter_map):
+"""
+Helper method to get max partition of partitions with partition_key
+from part specs. key:value pair in filter_map will be used to
+filter out partitions.
+
+:param part_specs: list of partition specs.
+:type part_specs: list
+:param partition_key: partition key name.
+:type partition_key: string
+:param filter_map: partition_key:partition_value map used for 
partition filtering,
+   e.g. {'key1': 'value1', 'key2': 'value2'}.
+   Only partitions matching all 
partition_key:partition_value
+   pairs will be considered as candidates of max 
partition.
+:type filter_map: map
+:return: Max partition or None if part_specs is empty.
+"""
+if not part_specs:
 return None
 
-prefix = key_name + '='
-prefix_len = len(key_name) + 1
-max_val = None
-for part_name in part_names:
-if part_name.startswith(prefix):
-if max_val is None:
-max_val = part_name[prefix_len:]
-else:
-max_val = max(max_val, part_name[prefix_len:])
-else:
-raise AirflowException(
-"Partition name mal-formatted: {}".format(part_name))
-return max_val
+# Assuming all specs have the same keys.
+if partition_key not in part_specs[0].keys():
+raise AirflowException("Provided partition_key {} "
+   "is not in 
part_specs.".format(partition_key))
 
-def max_partition(self, schema, table_name, field=None):
+if filter_map and not set(filter_map.keys()) < 
set(part_specs[0].keys()):
+raise AirflowException("Keys in provided filter_map {} "
+   "are not subset of part_spec keys: {}"
+   .format(', '.join(filter_map.keys()),
+   ', '.join(part_specs[0

incubator-airflow git commit: [AIRFLOW-2150] Use lighter call in HiveMetastoreHook().max_partition()

2018-03-07 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0f9f4605f -> b8c2cea36


[AIRFLOW-2150] Use lighter call in HiveMetastoreHook().max_partition()

Call self.metastore.get_partition_names() instead of
self.metastore.get_partitions(), which is extremely expensive for
large tables, in HiveMetastoreHook().max_partition().

Closes #3082 from
yrqls21/kevin_yang_fix_hive_max_partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b8c2cea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b8c2cea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b8c2cea3

Branch: refs/heads/master
Commit: b8c2cea36299d6a3264d8bb1dc5a3995732b8855
Parents: 0f9f460
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Wed Mar 7 16:12:14 2018 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Mar 7 16:12:18 2018 -0800

--
 airflow/hooks/hive_hooks.py   | 64 ++
 airflow/macros/hive.py|  2 +-
 tests/hooks/test_hive_hook.py | 39 +++
 3 files changed, 91 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8c2cea3/airflow/hooks/hive_hooks.py
--
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index cd7319d..128be41 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -429,9 +429,11 @@ class HiveCliHook(BaseHook):
 
 
 class HiveMetastoreHook(BaseHook):
-
 """ Wrapper to interact with the Hive Metastore"""
 
+# java short max val
+MAX_PART_COUNT = 32767
+
 def __init__(self, metastore_conn_id='metastore_default'):
 self.metastore_conn = self.get_connection(metastore_conn_id)
 self.metastore = self.get_metastore_client()
@@ -601,16 +603,46 @@ class HiveMetastoreHook(BaseHook):
 if filter:
 parts = self.metastore.get_partitions_by_filter(
 db_name=schema, tbl_name=table_name,
-filter=filter, max_parts=32767)
+filter=filter, max_parts=HiveMetastoreHook.MAX_PART_COUNT)
 else:
 parts = self.metastore.get_partitions(
-db_name=schema, tbl_name=table_name, max_parts=32767)
+db_name=schema, tbl_name=table_name,
+max_parts=HiveMetastoreHook.MAX_PART_COUNT)
 
 self.metastore._oprot.trans.close()
 pnames = [p.name for p in table.partitionKeys]
 return [dict(zip(pnames, p.values)) for p in parts]
 
-def max_partition(self, schema, table_name, field=None, filter=None):
+@staticmethod
+def _get_max_partition_from_part_names(part_names, key_name):
+"""
+Helper method to get max partition from part names. Works only
+when partition format follows '{key}={value}' and key_name is name of
+the only partition key.
+:param part_names: list of partition names
+:type part_names: list
+:param key_name: partition key name
+:type key_name: str
+:return: Max partition or None if part_names is empty.
+"""
+if not part_names:
+return None
+
+prefix = key_name + '='
+prefix_len = len(key_name) + 1
+max_val = None
+for part_name in part_names:
+if part_name.startswith(prefix):
+if max_val is None:
+max_val = part_name[prefix_len:]
+else:
+max_val = max(max_val, part_name[prefix_len:])
+else:
+raise AirflowException(
+"Partition name mal-formatted: {}".format(part_name))
+return max_val
+
+def max_partition(self, schema, table_name, field=None):
 """
 Returns the maximum value for all partitions in a table. Works only
 for tables that have a single partition key. For subpartitioned
@@ -621,17 +653,23 @@ class HiveMetastoreHook(BaseHook):
 >>> hh.max_partition(schema='airflow', table_name=t)
 '2015-01-01'
 """
-parts = self.get_partitions(schema, table_name, filter)
-if not parts:
-return None
-elif len(parts[0]) == 1:
-field = list(parts[0].keys())[0]
-elif not field:
+self.metastore._oprot.trans.open()
+table = self.metastore.get_table(dbname=schema, tbl_name=table_name)
+if len(table.partitionKeys) != 1:
 raise AirflowException(
-"Please speci

[jira] [Reopened] (AIRFLOW-226) Create separate pip packages for webserver and hooks

2018-03-05 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov reopened AIRFLOW-226:
-

> Create separate pip packages for webserver and hooks
> 
>
> Key: AIRFLOW-226
> URL: https://issues.apache.org/jira/browse/AIRFLOW-226
> Project: Apache Airflow
>  Issue Type: Improvement
>        Reporter: Dan Davydov
>Priority: Minor
>
> There are users who want only the airflow hooks, and others who many not need 
> the front-end. The hooks and webserver should be moved into their own 
> packages, with the current airflow package depending on these packages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-226) Create separate pip packages for webserver and hooks

2018-03-05 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387016#comment-16387016
 ] 

Dan Davydov commented on AIRFLOW-226:
-

I feel strongly (at least for hooks) that they should be moved out. Things like 
storing secrets in the Airflow database, hooks, etc. are convenient, but they 
are equivalent to plugins and should have their own owners and maintainers. It 
doesn't make sense to e.g. make the owner and expert of the HiveHook be a 
committer in this repo but they certainly should be the committer and 
maintainer of the HiveHook repo. Another point as to why it makes sense to 
decouple hooks and the core is that it doesn't scale to support backwards 
incompatible changes to all operators for the Airflow committers, we are 
effectively supporting many hooks which we have no domain knowledge of. Other 
systems such as Jenkins follow a similar plugin framework.

> Create separate pip packages for webserver and hooks
> 
>
> Key: AIRFLOW-226
> URL: https://issues.apache.org/jira/browse/AIRFLOW-226
> Project: Apache Airflow
>  Issue Type: Improvement
>        Reporter: Dan Davydov
>Priority: Minor
>
> There are users who want only the airflow hooks, and others who many not need 
> the front-end. The hooks and webserver should be moved into their own 
> packages, with the current airflow package depending on these packages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2173) Don't check task IDs for concurrency reached check

2018-03-03 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-2173:


 Summary: Don't check task IDs for concurrency reached check
 Key: AIRFLOW-2173
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2173
 Project: Apache Airflow
  Issue Type: Bug
  Components: core
Reporter: Dan Davydov
Assignee: Dan Davydov


Currently the concurrency reached check does a filter in the DB query to only 
include tasks that are currently in the parsed DAG. For sufficiently large DAGs 
with many tasks, this causes mysql to use an inefficient query plan and can put 
a lot of load on the database. Since there is no good reason to omit old 
running tasks in a DAG from concurrency, this filter should just be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2156) Parallelize Celery Executor

2018-02-27 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-2156:


 Summary: Parallelize Celery Executor
 Key: AIRFLOW-2156
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2156
 Project: Apache Airflow
  Issue Type: Improvement
  Components: celery
Reporter: Dan Davydov
Assignee: Dan Davydov


The CeleryExecutor doesn't currently support parallel execution to check task 
states since Celery does not support this. This can greatly slow down the 
Scheduler loops since each request to check a task's state is a network request.

 

The Celery Executor should parallelize these requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-908) Airflow run should print worker name at top of log

2018-02-14 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov resolved AIRFLOW-908.
-
Resolution: Fixed

> Airflow run should print worker name at top of log
> --
>
> Key: AIRFLOW-908
> URL: https://issues.apache.org/jira/browse/AIRFLOW-908
> Project: Apache Airflow
>  Issue Type: New Feature
>        Reporter: Dan Davydov
>    Assignee: Dan Davydov
>Priority: Major
>  Labels: beginner, starter
>
> Airflow run should log the worker hostname at top of log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-1985] Impersonation fixes for using `run_as_user`

2018-02-07 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6f00f722b -> a0deb506c


[AIRFLOW-1985] Impersonation fixes for using `run_as_user`

Changes to propagate config in
subdags, and make sure tasks running have the
config.

Making sure all tasks have the proper
configuration by copying it into a
temporary file.

BashOperators should have the context of the
AIRFLOW_HOME and PYTHONPATH
as other tasks have.

Closes #2991 from edgarRd/erod-impersonation-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a0deb506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a0deb506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a0deb506

Branch: refs/heads/master
Commit: a0deb506c070637abc3c426bc7d060e3fe6c854d
Parents: 6f00f72
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Wed Feb 7 14:33:36 2018 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Feb 7 14:33:40 2018 -0800

--
 airflow/executors/base_executor.py   | 11 -
 airflow/jobs.py  | 10 +++-
 airflow/models.py|  2 +
 airflow/operators/bash_operator.py   | 24 +++--
 airflow/task/task_runner/base_task_runner.py | 32 +---
 airflow/utils/configuration.py   | 45 +
 airflow/utils/log/file_task_handler.py   |  2 +-
 tests/dags/test_impersonation_subdag.py  | 59 +++
 tests/impersonation.py   | 10 
 9 files changed, 167 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a0deb506/airflow/executors/base_executor.py
--
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index d3d0675..d606057 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -58,8 +58,14 @@ class BaseExecutor(LoggingMixin):
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=None):
+pool=None,
+cfg_path=None):
 pool = pool or task_instance.pool
+
+# TODO (edgarRd): AIRFLOW-1985:
+# cfg_path is needed to propagate the config values if using 
impersonation
+# (run_as_user), given that there are different code paths running 
tasks.
+# For a long term solution we need to address AIRFLOW-1986
 command = task_instance.command(
 local=True,
 mark_success=mark_success,
@@ -68,7 +74,8 @@ class BaseExecutor(LoggingMixin):
 ignore_task_deps=ignore_task_deps,
 ignore_ti_state=ignore_ti_state,
 pool=pool,
-pickle_id=pickle_id)
+pickle_id=pickle_id,
+cfg_path=cfg_path)
 self.queue_command(
 task_instance,
 command,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a0deb506/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ccb22cb..172792d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -58,6 +58,7 @@ from airflow.utils.db import create_session, provide_session
 from airflow.utils.email import send_email
 from airflow.utils.log.logging_mixin import LoggingMixin, set_context, 
StreamLogWriter
 from airflow.utils.state import State
+from airflow.utils.configuration import tmp_configuration_copy
 
 Base = models.Base
 ID_LEN = models.ID_LEN
@@ -2234,13 +2235,20 @@ class BackfillJob(BaseJob):
 # Skip scheduled state, we are executing 
immediately
 ti.state = State.QUEUED
 session.merge(ti)
+
+cfg_path = None
+if executor.__class__ in 
(executors.LocalExecutor,
+  
executors.SequentialExecutor):
+cfg_path = tmp_configuration_copy()
+
 executor.queue_task_instance(
 ti,
 mark_success=self.mark_success,
 pickle_id=pickle_id,
 ignore_task_deps=self.ignore_task_deps,
 
ignore_depends_on_past=ignore_depends_on_past,
-pool=self.pool)
+pool=self.pool,
+cfg_path=cfg_path)
 ti

[jira] [Created] (AIRFLOW-2061) Remove expensive redundant async.state calls in celery executor

2018-02-02 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-2061:


 Summary: Remove expensive redundant async.state calls in celery 
executor
 Key: AIRFLOW-2061
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2061
 Project: Apache Airflow
  Issue Type: Improvement
  Components: executor
Reporter: Dan Davydov
Assignee: Dan Davydov


The Celery Executor makes the expense calls async.state multiple times, when it 
could reuse the initial async.state call.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed

2018-01-29 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-2027:
-
Description: 
The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this 
sleep to slightly speed up scheduling, and instead do it once all files have 
been parsed. It can add up since it runs to every scheduler loop which runs # 
of dags to parse/scheduler parallelism times.

Also remove the unnecessary increased file processing interval in tests which 
slows them down.

  was:
The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this 
sleep to slightly speed up scheduling. It can add up since it runs to every 
scheduler loop which runs # of dags to parse/scheduler parallelism times.

 

Also remove the unnecessary increased file processing interval in tests which 
slows them down.


> Only trigger sleep in scheduler after all files have parsed
> ---
>
> Key: AIRFLOW-2027
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2027
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>    Reporter: Dan Davydov
>    Assignee: Dan Davydov
>Priority: Major
>
> The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this 
> sleep to slightly speed up scheduling, and instead do it once all files have 
> been parsed. It can add up since it runs to every scheduler loop which runs # 
> of dags to parse/scheduler parallelism times.
> Also remove the unnecessary increased file processing interval in tests which 
> slows them down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2023] Add debug logging around number of queued files

2018-01-29 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master da0e628fa -> 61ff29e57


[AIRFLOW-2023] Add debug logging around number of queued files

Add debug logging around number of queued files to
process in the
scheduler. This makes it easy to see when there
are bottlenecks due to parallelism and how long it
takes for all files to be processed.

Closes #2968 from aoen/ddavydov--
add_more_scheduler_metrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61ff29e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61ff29e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61ff29e5

Branch: refs/heads/master
Commit: 61ff29e578d1121ab4606fe122fb4e2db8f075b9
Parents: da0e628
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Mon Jan 29 12:01:58 2018 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Jan 29 12:02:02 2018 -0800

--
 airflow/utils/dag_processing.py | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61ff29e5/airflow/utils/dag_processing.py
--
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index dc0c7a6..b4abd34 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -476,6 +476,12 @@ class DagFileProcessorManager(LoggingMixin):
 running_processors[file_path] = processor
 self._processors = running_processors
 
+self.log.debug("%s/%s scheduler processes running",
+   len(self._processors), self._parallelism)
+
+self.log.debug("%s file paths queued for processing",
+   len(self._file_path_queue))
+
 # Collect all the DAGs that were found in the processed files
 simple_dags = []
 for file_path, processor in finished_processors.items():



[jira] [Updated] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed

2018-01-29 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-2027:
-
Summary: Only trigger sleep in scheduler after all files have parsed  (was: 
Remove unnecessary 1s sleep in scheduler loop)

> Only trigger sleep in scheduler after all files have parsed
> ---
>
> Key: AIRFLOW-2027
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2027
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>    Reporter: Dan Davydov
>    Assignee: Dan Davydov
>Priority: Major
>
> The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this 
> sleep to slightly speed up scheduling. It can add up since it runs to every 
> scheduler loop which runs # of dags to parse/scheduler parallelism times.
>  
> Also remove the unnecessary increased file processing interval in tests which 
> slows them down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2027) Remove unnecessary 1s sleep in scheduler loop

2018-01-26 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341861#comment-16341861
 ] 

Dan Davydov commented on AIRFLOW-2027:
--

the PR I'm about to cut will sleep 1s - time it took to parse dags (so still 1s 
minimum). Not greatly concerned about the busy looping really, but the log spam 
is an issue.

 

We have found in high load situations, 10% of the delay was caused by this 
sleep.

> Remove unnecessary 1s sleep in scheduler loop
> -
>
> Key: AIRFLOW-2027
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2027
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>    Reporter: Dan Davydov
>    Assignee: Dan Davydov
>Priority: Major
>
> The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this 
> sleep to slightly speed up scheduling. It can add up since it runs to every 
> scheduler loop which runs # of dags to parse/scheduler parallelism times.
>  
> Also remove the unnecessary increased file processing interval in tests which 
> slows them down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2027) Remove unnecessary 1s sleep in scheduler loop

2018-01-24 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-2027:


 Summary: Remove unnecessary 1s sleep in scheduler loop
 Key: AIRFLOW-2027
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2027
 Project: Apache Airflow
  Issue Type: Improvement
  Components: scheduler
Reporter: Dan Davydov
Assignee: Dan Davydov


The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this 
sleep to slightly speed up scheduling. It can add up since it runs to every 
scheduler loop which runs # of dags to parse/scheduler parallelism times.

 

Also remove the unnecessary increased file processing interval in tests which 
slows them down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1985) Subdag tasks do not work with impersonation

2018-01-18 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1985:
-
Description: 
When using {{run_as_user}} for impersonation, airflow creates a subset of the 
configuration to make it available for the task during execution via 
{{base_task_runner.py}}. This behavior is consistent for the triggering the 
subdag task.
 The above code path is not executed when running tasks within a subdag using 
the {{SequentialExecutor}}, where each task is run direcly.

Note that in the context of subdags, tasks running for the subdags are already 
running in the same context of the user, so no additional impersonation is 
needed, but since it's not guaranteed that the user has the right configuration 
settings (hence why we copy a subset of the configuration during 
impersonation), we need to propagate those settings for the tasks within the 
subdag as well.

This change also requires exporting AIRFLOW_HOME and PYTHONPATH env variables 
in bash operator so that run_as_user can work if airflow operators are called 
from a bash operator (e.g. if a bash operator calls python code that imports 
airflow but the airflow user isn't the same as the run_as_user).

  was:
When using {{run_as_user}} for impersonation, airflow creates a subset of the 
configuration to make it available for the task during execution via 
{{base_task_runner.py}}. This behavior is consistent for the triggering the 
subdag task.
The above code path is not executed when running tasks within a subdag using 
the {{SequentialExecutor}}, where each task is run direcly.

Note that in the context of subdags, tasks running for the subdags are already 
running in the same context of the user, so no additional impersonation is 
needed, but since it's not guaranteed that the user has the right configuration 
settings (hence why we copy a subset of the configuration during 
impersonation), we need to propagate those settings for the tasks within the 
subdag as well.


> Subdag tasks do not work with impersonation
> ---
>
> Key: AIRFLOW-1985
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1985
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Edgar Rodriguez
>Assignee: Edgar Rodriguez
>Priority: Major
>
> When using {{run_as_user}} for impersonation, airflow creates a subset of the 
> configuration to make it available for the task during execution via 
> {{base_task_runner.py}}. This behavior is consistent for the triggering the 
> subdag task.
>  The above code path is not executed when running tasks within a subdag using 
> the {{SequentialExecutor}}, where each task is run direcly.
> Note that in the context of subdags, tasks running for the subdags are 
> already running in the same context of the user, so no additional 
> impersonation is needed, but since it's not guaranteed that the user has the 
> right configuration settings (hence why we copy a subset of the configuration 
> during impersonation), we need to propagate those settings for the tasks 
> within the subdag as well.
> This change also requires exporting AIRFLOW_HOME and PYTHONPATH env variables 
> in bash operator so that run_as_user can work if airflow operators are called 
> from a bash operator (e.g. if a bash operator calls python code that imports 
> airflow but the airflow user isn't the same as the run_as_user).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1980) Scheduler DB Queries Should be Batched

2018-01-09 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1980:
-
Description: 
DB queries in the scheduler should be batched, since the larger ones can get 
quite big and cause lock wait timeouts.

One example is the queries in the _change_state_for_tis_without_dagrun function 
in jobs.py

The orphan task cleanup is another such piece of code.

  was:
DB queries in the scheduler should be batched, since the larger ones can get 
quite big and cause lock wait timeouts.

One example is the queries in the _change_state_for_tis_without_dagrun function 
in jobs.py


> Scheduler DB Queries Should be Batched
> --
>
> Key: AIRFLOW-1980
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1980
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>    Reporter: Dan Davydov
>
> DB queries in the scheduler should be batched, since the larger ones can get 
> quite big and cause lock wait timeouts.
> One example is the queries in the _change_state_for_tis_without_dagrun 
> function in jobs.py
> The orphan task cleanup is another such piece of code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (AIRFLOW-1980) Scheduler DB Queries Should be Batched

2018-01-09 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1980:
-
Description: 
DB queries in the scheduler should be batched, since the larger ones can get 
quite big and cause lock wait timeouts.

One example is the queries in the _change_state_for_tis_without_dagrun function 
in jobs.py

The orphan task cleanup is another such piece of code that should have the 
queries done in batch.

  was:
DB queries in the scheduler should be batched, since the larger ones can get 
quite big and cause lock wait timeouts.

One example is the queries in the _change_state_for_tis_without_dagrun function 
in jobs.py

The orphan task cleanup is another such piece of code.


> Scheduler DB Queries Should be Batched
> --
>
> Key: AIRFLOW-1980
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1980
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>    Reporter: Dan Davydov
>
> DB queries in the scheduler should be batched, since the larger ones can get 
> quite big and cause lock wait timeouts.
> One example is the queries in the _change_state_for_tis_without_dagrun 
> function in jobs.py
> The orphan task cleanup is another such piece of code that should have the 
> queries done in batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1980) Scheduler DB Queries Should be Batched

2018-01-09 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1980:


 Summary: Scheduler DB Queries Should be Batched
 Key: AIRFLOW-1980
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1980
 Project: Apache Airflow
  Issue Type: Bug
  Components: scheduler
Reporter: Dan Davydov


DB queries in the scheduler should be batched, since the larger ones can get 
quite big and cause lock wait timeouts.

One example is the queries in the _change_state_for_tis_without_dagrun function 
in jobs.py



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1971] Propagate hive config on impersonation

2018-01-08 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e46cde418 -> e9c1ac588


[AIRFLOW-1971] Propagate hive config on impersonation

Currently, if hive specific settings are defined
in the configuration
file, they are not being propagated when using
impersonation.
We need to propagate this configuration down to
the impersonated
process.

Closes #2920 from edgarRd/erod-propagate-hive-conf


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9c1ac58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9c1ac58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9c1ac58

Branch: refs/heads/master
Commit: e9c1ac588a698b88f916d6f47531d7e0dc63237d
Parents: e46cde4
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Mon Jan 8 11:25:12 2018 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Jan 8 11:25:16 2018 -0800

--
 airflow/task/task_runner/base_task_runner.py |  1 +
 tests/dags/test_impersonation_custom.py  | 10 ++
 2 files changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9c1ac58/airflow/task/task_runner/base_task_runner.py
--
diff --git a/airflow/task/task_runner/base_task_runner.py 
b/airflow/task/task_runner/base_task_runner.py
index 664a873..11a4745 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -64,6 +64,7 @@ class BaseTaskRunner(LoggingMixin):
 'smtp': cfg_dict.get('smtp', {}),
 'scheduler': cfg_dict.get('scheduler', {}),
 'webserver': cfg_dict.get('webserver', {}),
+'hive': cfg_dict.get('hive', {}),  # we should probably 
generalized this
 }
 temp_fd, cfg_path = mkstemp()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9c1ac58/tests/dags/test_impersonation_custom.py
--
diff --git a/tests/dags/test_impersonation_custom.py 
b/tests/dags/test_impersonation_custom.py
index 6f35b38..b6dd9c8 100644
--- a/tests/dags/test_impersonation_custom.py
+++ b/tests/dags/test_impersonation_custom.py
@@ -41,7 +41,17 @@ def print_today():
 print('Today is {}'.format(dt.strftime('%Y-%m-%d')))
 
 
+def check_hive_conf():
+from airflow import configuration as conf
+assert conf.get('hive', 'default_hive_mapred_queue') == 'airflow'
+
+
 PythonOperator(
 python_callable=print_today,
 task_id='exec_python_fn',
 dag=dag)
+
+PythonOperator(
+python_callable=check_hive_conf,
+task_id='exec_check_hive_conf_fn',
+dag=dag)



incubator-airflow git commit: [AIRFLOW-1963] Add config for HiveOperator mapred_queue

2018-01-03 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 07c2a515e -> b3489b99e


[AIRFLOW-1963] Add config for HiveOperator mapred_queue

Adding configuration setting for specifying a
default mapred_queue for
hive jobs using the HiveOperator.

Closes #2915 from edgarRd/erod-hive-mapred-queue-
config


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3489b99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3489b99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3489b99

Branch: refs/heads/master
Commit: b3489b99e9140e25bdd08b78f57ba845c3edb358
Parents: 07c2a51
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Wed Jan 3 14:23:09 2018 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Jan 3 14:23:14 2018 -0800

--
 airflow/config_templates/default_airflow.cfg |  4 +-
 airflow/config_templates/default_test.cfg|  3 +
 airflow/hooks/hive_hooks.py  |  4 +-
 airflow/operators/hive_operator.py   | 19 --
 scripts/ci/airflow_travis.cfg|  3 +
 tests/operators/hive_operator.py | 82 ++-
 6 files changed, 77 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_airflow.cfg
--
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index a7a3b7d..d0dfb72 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -170,6 +170,9 @@ default_ram = 512
 default_disk = 512
 default_gpus = 0
 
+[hive]
+# Default mapreduce queue for HiveOperator tasks
+default_hive_mapred_queue =
 
 [webserver]
 # The base url of your website as airflow cannot guess what domain or
@@ -458,7 +461,6 @@ keytab = airflow.keytab
 [github_enterprise]
 api_rev = v3
 
-
 [admin]
 # UI to hide sensitive variable fields when set to True
 hide_sensitive_variable_fields = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_test.cfg
--
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 85343ee..eaf3d03 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -51,6 +51,9 @@ auth_backend = airflow.api.auth.backend.default
 [operators]
 default_owner = airflow
 
+[hive]
+default_hive_mapred_queue = airflow
+
 [webserver]
 base_url = http://localhost:8080
 web_server_host = 0.0.0.0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/hooks/hive_hooks.py
--
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index eb39469..3d986fd 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -25,6 +25,7 @@ import time
 from tempfile import NamedTemporaryFile
 import hive_metastore
 
+from airflow import configuration as conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.utils.helpers import as_flattened_list
@@ -82,7 +83,8 @@ class HiveCliHook(BaseHook):
 "Invalid Mapred Queue Priority.  Valid values are: "
 "{}".format(', '.join(HIVE_QUEUE_PRIORITIES)))
 
-self.mapred_queue = mapred_queue
+self.mapred_queue = mapred_queue or conf.get('hive',
+ 
'default_hive_mapred_queue')
 self.mapred_queue_priority = mapred_queue_priority
 self.mapred_job_name = mapred_job_name
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/operators/hive_operator.py
--
diff --git a/airflow/operators/hive_operator.py 
b/airflow/operators/hive_operator.py
index 221feeb..ce98544 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -77,13 +77,19 @@ class HiveOperator(BaseOperator):
 self.mapred_queue_priority = mapred_queue_priority
 self.mapred_job_name = mapred_job_name
 
+# assigned lazily - just for consistency we can create the attribute 
with a
+# `None` initial value, later it will be populated by the execute 
method.
+# This also makes `on_kill` implementation consistent since it assumes 
`self.hook`
+# is defined.
+self.hook = None
+
 def get_hook

[jira] [Commented] (AIRFLOW-511) DagRun Failure: Retry, Email, Callbacks

2017-12-13 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290485#comment-16290485
 ] 

Dan Davydov commented on AIRFLOW-511:
-

You might be able to accomplish this for now using the task sla feature on a 
task that depends on all of the other tasks in the DAG

> DagRun Failure: Retry, Email, Callbacks
> ---
>
> Key: AIRFLOW-511
> URL: https://issues.apache.org/jira/browse/AIRFLOW-511
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Rob Froetscher
>Priority: Minor
>
> Are there any plans to have retry, email, or callbacks on failure of a DAG 
> run? Would you guys be open to someone implementing that? Right now 
> particularly with dagrun_timeout, there is not much insight that the dag 
> actually stopped.
> Pseudocode: 
> https://github.com/apache/incubator-airflow/compare/master...rfroetscher:dagrun_failure



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1819] Fix slack operator unittest bug

2017-11-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d8e8f9014 -> 3c8f7747b


[AIRFLOW-1819] Fix slack operator unittest bug

Fix failing slack operator unittest and add test
coverage.

Closes #2791 from yrqls21/kevin-yang-fix-unit-test


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c8f7747
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c8f7747
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c8f7747

Branch: refs/heads/master
Commit: 3c8f7747b08ca5c23233799e56a040f60e3d0fc6
Parents: d8e8f90
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Wed Nov 15 17:52:06 2017 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Nov 15 17:52:09 2017 -0800

--
 tests/operators/slack_operator.py | 16 +---
 1 file changed, 13 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c8f7747/tests/operators/slack_operator.py
--
diff --git a/tests/operators/slack_operator.py 
b/tests/operators/slack_operator.py
index 5e40648..4d6b553 100644
--- a/tests/operators/slack_operator.py
+++ b/tests/operators/slack_operator.py
@@ -58,6 +58,7 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase):
 }
 ]
 self.test_attachments_in_json = json.dumps(self.test_attachments)
+self.test_api_params = {'key': 'value'}
 self.test_kwarg = 'test_kwarg'
 
 self.expected_method = 'chat.postMessage'
@@ -69,7 +70,7 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase):
 'attachments': self.test_attachments_in_json,
 }
 
-def __construct_operator(self, test_token, test_slack_conn_id):
+def __construct_operator(self, test_token, test_slack_conn_id, 
test_api_params=None):
 return SlackAPIPostOperator(
 task_id='slack',
 username=self.test_username,
@@ -79,6 +80,7 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase):
 text=self.test_text,
 icon_url=self.test_icon_url,
 attachments=self.test_attachments,
+api_params=test_api_params,
 kwarg=self.test_kwarg
 )
 
@@ -96,6 +98,14 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase):
 
 slack_hook_mock.call.assert_called_with(self.expected_method, 
self.expected_api_params)
 
+slack_api_post_operator = self.__construct_operator(test_token, None, 
self.test_api_params)
+
+slack_api_post_operator.execute()
+
+slack_hook_class_mock.assert_called_with(token=test_token, 
slack_conn_id=None)
+
+slack_hook_mock.call.assert_called_with(self.expected_method, 
self.test_api_params)
+
 @mock.patch('airflow.operators.slack_operator.SlackHook')
 def test_execute_with_slack_conn_id_only(self, slack_hook_class_mock):
 slack_hook_mock = mock.Mock()
@@ -121,13 +131,13 @@ class SlackAPIPostOperatorTestCase(unittest.TestCase):
 test_token = 'test_token'
 test_slack_conn_id = 'test_slack_conn_id'
 
-slack_api_post_operator = self.__construct_operator(test_token, None)
+slack_api_post_operator = self.__construct_operator(test_token, None, 
self.test_api_params)
 self.assertEqual(slack_api_post_operator.token, test_token)
 self.assertEqual(slack_api_post_operator.slack_conn_id, None)
 self.assertEqual(slack_api_post_operator.method, self.expected_method)
 self.assertEqual(slack_api_post_operator.text, self.test_text)
 self.assertEqual(slack_api_post_operator.channel, self.test_channel)
-self.assertEqual(slack_api_post_operator.api_params, 
self.expected_api_params)
+self.assertEqual(slack_api_post_operator.api_params, 
self.test_api_params)
 self.assertEqual(slack_api_post_operator.username, self.test_username)
 self.assertEqual(slack_api_post_operator.icon_url, self.test_icon_url)
 self.assertEqual(slack_api_post_operator.attachments, 
self.test_attachments)



incubator-airflow git commit: [AIRFLOW-1805] Allow Slack token to be passed through connection

2017-11-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d04519e60 -> d8e8f9014


[AIRFLOW-1805] Allow Slack token to be passed through connection

Allow users to pass in Slack token through
connection which can provide better security. This
enables user to expose token only to workers
instead to both workers and schedulers.

Closes #2789 from
yrqls21/add_conn_supp_in_slack_op


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8e8f901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8e8f901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8e8f901

Branch: refs/heads/master
Commit: d8e8f90142246ae5b02c1a0f9649ea5a419a5afc
Parents: d04519e
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Wed Nov 15 14:53:56 2017 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Nov 15 14:53:58 2017 -0800

--
 airflow/hooks/__init__.py   |   1 +
 airflow/hooks/slack_hook.py |  56 
 airflow/operators/slack_operator.py |  25 --
 tests/hooks/test_slack_hook.py  | 101 ++
 tests/operators/slack_operator.py   | 141 +++
 5 files changed, 315 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/hooks/__init__.py
--
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 6e96e2a..6372b2f 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -55,6 +55,7 @@ _hooks = {
 'dbapi_hook': ['DbApiHook'],
 'mssql_hook': ['MsSqlHook'],
 'oracle_hook': ['OracleHook'],
+'slack_hook': ['SlackHook'],
 }
 
 import os as _os

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/hooks/slack_hook.py
--
diff --git a/airflow/hooks/slack_hook.py b/airflow/hooks/slack_hook.py
new file mode 100644
index 000..cd47573
--- /dev/null
+++ b/airflow/hooks/slack_hook.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from slackclient import SlackClient
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+
+class SlackHook(BaseHook):
+"""
+   Interact with Slack, using slackclient library.
+"""
+
+def __init__(self, token=None, slack_conn_id=None):
+"""
+Takes both Slack API token directly and connection that has Slack API 
token.
+
+If both supplied, Slack API token will be used.
+
+:param token: Slack API token
+:type token: string
+:param slack_conn_id: connection that has Slack API token in the 
password field
+:type slack_conn_id: string
+"""
+self.token = self.__get_token(token, slack_conn_id)
+
+def __get_token(self, token, slack_conn_id):
+if token is not None:
+return token
+elif slack_conn_id is not None:
+conn = self.get_connection(slack_conn_id)
+
+if not getattr(conn, 'password', None):
+raise AirflowException('Missing token(password) in Slack 
connection')
+return conn.password
+else:
+raise AirflowException('Cannot get token: No valid Slack token nor 
slack_conn_id supplied.')
+
+def call(self, method, api_params):
+sc = SlackClient(self.token)
+rc = sc.api_call(method, **api_params)
+
+if not rc['ok']:
+msg = "Slack API call failed (%s)".format(rc['error'])
+raise AirflowException(msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/operators/slack_operator.py
--
diff --git a/airflow/operators/slack_operator.py 
b/airflow/operators/slack_operator.py
index 8b21211..8398a7a 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -14,9 +14,9 @@
 
 import json
 
-from slackclient import SlackClient
 from airflow.models import BaseOper

incubator-airflow git commit: [AIRFLOW-1787] Fix task instance batch clear and set state bugs

2017-11-07 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1a7b63eb1 -> 313f5bac4


[AIRFLOW-1787] Fix task instance batch clear and set state bugs

Fixes Batch clear in Task Instances view is not working
for task instances in RUNNING state and all batch
operations in Task instances view cannot work when
manually triggered task instances are selected
because they have a different execution date
format.

Closes #2759 from yrqls21/fix-ti-batch-clear-n
-set-state-bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/313f5bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/313f5bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/313f5bac

Branch: refs/heads/master
Commit: 313f5bac4a3f804094bcd583e0e5fbc3b5f405bb
Parents: 1a7b63e
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Tue Nov 7 11:22:58 2017 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Nov 7 11:23:00 2017 -0800

--
 airflow/utils/dates.py| 14 ++
 airflow/www/views.py  | 30 --
 tests/utils/test_dates.py |  9 +
 3 files changed, 43 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/airflow/utils/dates.py
--
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 43b87f4..81e1c2c 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -225,3 +225,17 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
 second=second,
 microsecond=microsecond)
 return today - timedelta(days=n)
+
+
+def parse_execution_date(execution_date_str):
+"""
+Parse execution date string to datetime object.
+"""
+try:
+# Execution date follows execution date format of scheduled executions,
+# e.g. '2017-11-02 00:00:00'
+return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S')
+except ValueError:
+# Execution date follows execution date format of manually triggered 
executions,
+# e.g. '2017-11-05 16:18:30..989729'
+return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S..%f')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 81c44b6..a6d788e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -76,7 +76,7 @@ from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils.dates import infer_time_unit, scale_time_units
+from airflow.utils.dates import infer_time_unit, scale_time_units, 
parse_execution_date
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.www.validators import GreaterEqualThan
@@ -2502,23 +2502,32 @@ class TaskInstanceModelView(ModelViewOnly):
 try:
 TI = models.TaskInstance
 
+dag_to_task_details = {}
 dag_to_tis = {}
 
-for id in ids:
-task_id, dag_id, execution_date = id.split(',')
+# Collect dags upfront as dagbag.get_dag() will reset the session
+for id_str in ids:
+task_id, dag_id, execution_date = id_str.split(',')
+dag = dagbag.get_dag(dag_id)
+task_details = dag_to_task_details.setdefault(dag, [])
+task_details.append((task_id, execution_date))
 
-ti = session.query(TI).filter(TI.task_id == task_id,
-  TI.dag_id == dag_id,
-  TI.execution_date == 
execution_date).one()
+for dag, task_details in dag_to_task_details.items():
+for task_id, execution_date in task_details:
+execution_date = parse_execution_date(execution_date)
 
-dag = dagbag.get_dag(dag_id)
-tis = dag_to_tis.setdefault(dag, [])
-tis.append(ti)
+ti = session.query(TI).filter(TI.task_id == task_id,
+  TI.dag_id == dag.dag_id,
+  TI.execution_date == 
execution_date).one()
+
+tis = dag_to_tis.setdefault(dag, [])
+tis.append(ti)
 
 for dag, tis in dag_to_tis.items():
 mode

incubator-airflow git commit: [AIRFLOW-1780] Fix long output lines with unicode from hanging parent

2017-11-06 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3fde1043f -> 1a7b63eb1


[AIRFLOW-1780] Fix long output lines with unicode from hanging parent

Fix long task output lines with unicode from
hanging parent process. Tasks that create output
that gets piped into a file in the parent airflow
process would hang if they had long lines with
unicode characters.

Closes #2758 from aoen/ddavydov--
fix_unicode_output_string


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1a7b63eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1a7b63eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1a7b63eb

Branch: refs/heads/master
Commit: 1a7b63eb16ffa1cb97cb09f71997dcd39f28e645
Parents: 3fde104
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Mon Nov 6 16:01:59 2017 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Nov 6 16:02:02 2017 -0800

--
 airflow/task_runner/base_task_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1a7b63eb/airflow/task_runner/base_task_runner.py
--
diff --git a/airflow/task_runner/base_task_runner.py 
b/airflow/task_runner/base_task_runner.py
index 6a07db2..f4b4f2d 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin):
 line = line.decode('utf-8')
 if len(line) == 0:
 break
-self.log.info('Subtask: %s', line.rstrip('\n'))
+self.log.info(u'Subtask: %s', line.rstrip('\n'))
 
 def run_command(self, run_with, join_args=False):
 """



[jira] [Commented] (AIRFLOW-1143) Tasks rejected by workers get stuck in QUEUED

2017-11-06 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241002#comment-16241002
 ] 

Dan Davydov commented on AIRFLOW-1143:
--

It may fix it, I'm not sure we were ever sure of the specific root cause. I 
think to have full confidence we should add a test that checks for this.

> Tasks rejected by workers get stuck in QUEUED
> -
>
> Key: AIRFLOW-1143
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1143
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>    Reporter: Dan Davydov
>Assignee: Gerard Toonstra
>
> If the scheduler schedules a task that is sent to a worker that then rejects 
> the task (e.g. because one of the dependencies of the tasks became bad, like 
> the pool became full), the task will be stuck in the QUEUED state. We hit 
> this trying to switch from invoking the scheduler "airflow scheduler -n 5" to 
> just "airflow scheduler".
> Restarting the scheduler fixes this because it cleans up orphans, but we 
> shouldn't have to restart the scheduler to fix these problems (the missing 
> job heartbeats should make the scheduler requeue the task).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (AIRFLOW-1780) Long Unicode Characters In BashOperator Output Cause Task Hang

2017-11-02 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1780:
-
Summary: Long Unicode Characters In BashOperator Output Cause Task Hang  
(was: Long Unicode Characters Cause Logging)

> Long Unicode Characters In BashOperator Output Cause Task Hang
> --
>
> Key: AIRFLOW-1780
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1780
> Project: Apache Airflow
>  Issue Type: Bug
>        Reporter: Dan Davydov
>    Assignee: Dan Davydov
>Priority: Major
>
> Conditions to replicate:
> Create a DAG with a single BashOperator that logs ~10 long lines of text with 
> at least one unicode character (e.g. cat a file with these contents)
> Use BashTaskRunner
> Log to a file
> Run a worker that picks up the task or an airflow run --local command
> Behavior:
> The BashOperator/cat command hangs
> Most likely this is due to a pipe issue, where the unicode characters are 
> filling up the pipe and .format() is not able to process partial unicode 
> characters. Interestingly removing one of the conditions above (e.g. logging 
> to stdout instead of a file or having short lines doesn't cause the issue).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1780) Long Unicode Characters Cause Logging

2017-11-02 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1780:


 Summary: Long Unicode Characters Cause Logging
 Key: AIRFLOW-1780
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1780
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov
Assignee: Dan Davydov
Priority: Major


Conditions to replicate:
Create a DAG with a single BashOperator that logs ~10 long lines of text with 
at least one unicode character (e.g. cat a file with these contents)
Use BashTaskRunner
Log to a file
Run a worker that picks up the task or an airflow run --local command

Behavior:
The BashOperator/cat command hangs

Most likely this is due to a pipe issue, where the unicode characters are 
filling up the pipe and .format() is not able to process partial unicode 
characters. Interestingly removing one of the conditions above (e.g. logging to 
stdout instead of a file or having short lines doesn't cause the issue).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1733) BashOperator readline can hang

2017-10-18 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1733:


 Summary: BashOperator readline can hang
 Key: AIRFLOW-1733
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1733
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov


If the child bash process dies for some reason, the BashOperator will hang 
since we use a blocking readline call to get output from the child. Instead we 
should read asynchronously or poll the child bash process like described in the 
solutions here:
https://stackoverflow.com/questions/18421757/live-output-from-subprocess-command



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1631] Fix timing issue in unit test

2017-10-17 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master cdfced324 -> cb868f49f


[AIRFLOW-1631] Fix timing issue in unit test

LocalWorker instances wait 1 sec for each unit of
work they perform, so
getting the response of a processor takes at least
1 sec after the unit
of work.

Increasing timeout in
LocalTaskJobTest.test_mark_success_no_kill and
decreasing the poking time on local executor
checking for results in the
unlimited implementation.

Closes #2699 from edgarRd/erod-fix-timing-failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb868f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb868f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb868f49

Branch: refs/heads/master
Commit: cb868f49f0b6144bd6488cbde5bdf2811001f6ac
Parents: cdfced3
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Tue Oct 17 17:51:34 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Oct 17 17:51:58 2017 -0700

--
 airflow/executors/local_executor.py | 2 +-
 tests/jobs.py   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb868f49/airflow/executors/local_executor.py
--
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 9b4f8e1..71bee22 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -160,7 +160,7 @@ class LocalExecutor(BaseExecutor):
 def end(self):
 while self.executor.workers_active > 0:
 self.executor.sync()
-time.sleep(1)
+time.sleep(0.5)
 
 class _LimitedParallelism(object):
 """Implements LocalExecutor with limited parallelism using a task 
queue to

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb868f49/tests/jobs.py
--
diff --git a/tests/jobs.py b/tests/jobs.py
index ba08fd6..88589d8 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -860,7 +860,7 @@ class LocalTaskJobTest(unittest.TestCase):
 session.merge(ti)
 session.commit()
 
-process.join(timeout=5)
+process.join(timeout=10)
 self.assertFalse(process.is_alive())
 ti.refresh_from_db()
 self.assertEqual(State.SUCCESS, ti.state)



incubator-airflow git commit: [AIRFLOW-1631] Fix local executor unbound parallelism

2017-10-17 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 707ab6952 -> cdfced324


[AIRFLOW-1631] Fix local executor unbound parallelism

Before, if unlimited parallelism was used passing
`0` for the
parallelism value, the local executor would stall
execution since no
worker was being created, violating the
BaseExecutor contract on the
parallelism option.

Now, if unbound parallelism is used, processes
will be created on demand
for each task submitted for execution.

Closes #2658 from edgarRd/erod-localexecutor-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cdfced32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cdfced32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cdfced32

Branch: refs/heads/master
Commit: cdfced3248c7f14b639919c093f4f3042deb754b
Parents: 707ab69
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Tue Oct 17 11:39:06 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Oct 17 11:39:22 2017 -0700

--
 airflow/executors/local_executor.py| 205 +++-
 tests/executors/test_local_executor.py |  74 ++
 2 files changed, 244 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdfced32/airflow/executors/local_executor.py
--
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index f9eceb3..9b4f8e1 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -11,6 +11,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+LocalExecutor runs tasks by spawning processes in a controlled fashion in 
different
+modes. Given that BaseExecutor has the option to receive a `parallelism` 
parameter to
+limit the number of process spawned, when this parameter is `0` the number of 
processes
+that LocalExecutor can spawn is unlimited.
+
+The following strategies are implemented:
+1. Unlimited Parallelism (self.parallelism == 0): In this strategy, 
LocalExecutor will
+spawn a process every time `execute_async` is called, that is, every task 
submitted to the
+LocalExecutor will be executed in its own process. Once the task is executed 
and the
+result stored in the `result_queue`, the process terminates. There is no need 
for a
+`task_queue` in this approach, since as soon as a task is received a new 
process will be
+allocated to the task. Processes used in this strategy are of class 
LocalWorker.
+
+2. Limited Parallelism (self.parallelism > 0): In this strategy, the 
LocalExecutor spawns
+the number of processes equal to the value of `self.parallelism` at `start` 
time,
+using a `task_queue` to coordinate the ingestion of tasks and the work 
distribution among
+the workers, which will take a task as soon as they are ready. During the 
lifecycle of
+the LocalExecutor, the worker processes are running waiting for tasks, once the
+LocalExecutor receives the call to shutdown the executor a poison token is 
sent to the
+workers to terminate them. Processes used in this strategy are of class 
QueuedLocalWorker.
+
+Arguably, `SequentialExecutor` could be thought as a LocalExecutor with limited
+parallelism of just 1 worker, i.e. `self.parallelism = 1`.
+This option could lead to the unification of the executor implementations, 
running
+locally, into just one `LocalExecutor` with multiple modes.
+"""
 
 import multiprocessing
 import subprocess
@@ -18,20 +45,63 @@ import time
 
 from builtins import range
 
-from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
-PARALLELISM = configuration.get('core', 'PARALLELISM')
-
 
 class LocalWorker(multiprocessing.Process, LoggingMixin):
+
+"""LocalWorker Process implementation to run airflow commands. Executes 
the given
+command and puts the result into a result queue when done, terminating 
execution."""
+
+def __init__(self, result_queue):
+"""
+:param result_queue: the queue to store result states tuples (key, 
State)
+:type result_queue: multiprocessing.Queue
+"""
+super(LocalWorker, self).__init__()
+self.daemon = True
+self.result_queue = result_queue
+self.key = None
+self.command = None
+
+def execute_work(self, key, command):
+"""
+Exe

[jira] [Commented] (AIRFLOW-1670) Make Airflow Viewable By Color-blind Users

2017-10-12 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202413#comment-16202413
 ] 

Dan Davydov commented on AIRFLOW-1670:
--

Took a stab at it here: https://github.com/apache/incubator-airflow/pull/2654
But it wasn't met very well, we should have a special color-blind mode.

> Make Airflow Viewable By Color-blind Users
> --
>
> Key: AIRFLOW-1670
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1670
> Project: Apache Airflow
>  Issue Type: Bug
>        Reporter: Dan Davydov
>    Assignee: Dan Davydov
>
> Currently the success and failed states are hard to differentiate for Airflow 
> users, RGB(204,255,204) for the success state works better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1681] Add batch clear in task instance view

2017-10-11 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 98b4df945 -> 9f2c16a0a


[AIRFLOW-1681] Add batch clear in task instance view

Allow users to batch clear selected task
instance(s) in task instance view. Only state(s)
of selected task instance(s) will be cleared--no
upstream nor downstream task instance will be
affected.

DAG(s) involved will be set to "RUNNING" state,
same as existing "clear" operation.

Keeping both "Delete" and "Clear" operations for
more smooth user habit transition--informing DAG
state change in pop-up (check screenshots).

Closes #2681 from yrqls21/add-batch-clear-in-task-
instance-view


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0

Branch: refs/heads/master
Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5
Parents: 98b4df9
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Wed Oct 11 14:05:33 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Oct 11 14:05:35 2017 -0700

--
 airflow/www/views.py | 62 +++
 1 file changed, 31 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bc63b5b..81ee61f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -37,7 +37,7 @@ import sqlalchemy as sqla
 from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
-abort, redirect, url_for, request, Markup, Response, current_app, 
render_template, 
+abort, redirect, url_for, request, Markup, Response, current_app, 
render_template,
 make_response)
 from flask_admin import BaseView, expose, AdminIndexView
 from flask_admin.contrib.sqla import ModelView
@@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly):
 'start_date', 'end_date', 'duration', 'job_id', 'hostname',
 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
 'pool', 'log_url')
-can_delete = True
 page_size = PAGE_SIZE
 
 @action('set_running', "Set state to 'running'", None)
@@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly):
 def action_set_retry(self, ids):
 self.set_task_instance_state(ids, State.UP_FOR_RETRY)
 
-@action('delete',
-lazy_gettext('Delete'),
-lazy_gettext('Are you sure you want to delete selected records?'))
-def action_delete(self, ids):
-"""
-As a workaround for AIRFLOW-277, this method overrides Flask-Admin's 
ModelView.action_delete().
-
-TODO: this method should be removed once the below bug is fixed on 
Flask-Admin side.
-https://github.com/flask-admin/flask-admin/issues/1226
-"""
-if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
-self.delete_task_instances(ids)
-else:
-super(TaskInstanceModelView, self).action_delete(ids)
-
 @provide_session
-def set_task_instance_state(self, ids, target_state, session=None):
+@action('clear',
+lazy_gettext('Clear'),
+lazy_gettext(
+'Are you sure you want to clear the state of the selected task 
instance(s)'
+' and set their dagruns to the running state?'))
+def action_clear(self, ids, session=None):
 try:
 TI = models.TaskInstance
-count = len(ids)
+
+dag_to_tis = {}
+
 for id in ids:
 task_id, dag_id, execution_date = id.split(',')
-execution_date = datetime.strptime(execution_date, '%Y-%m-%d 
%H:%M:%S')
+
 ti = session.query(TI).filter(TI.task_id == task_id,
   TI.dag_id == dag_id,
   TI.execution_date == 
execution_date).one()
-ti.state = target_state
+
+dag = dagbag.get_dag(dag_id)
+tis = dag_to_tis.setdefault(dag, [])
+tis.append(ti)
+
+for dag, tis in dag_to_tis.items():
+models.clear_task_instances(tis, session, dag=dag)
+
 session.commit()
-flash(
-"{count} task instances were set to 
'{target_state}'".format(**locals()))
+flash("{0} task instances have been cleared".format(len(ids)))
+
 except Exception as ex:
 if no

[jira] [Resolved] (AIRFLOW-1697) Mode to disable Airflow Charts

2017-10-10 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov resolved AIRFLOW-1697.
--
Resolution: Fixed

> Mode to disable Airflow Charts
> --
>
> Key: AIRFLOW-1697
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1697
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
>    Reporter: Dan Davydov
>    Assignee: Dan Davydov
>
> Mode to disable Airflow Charts



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1697] Mode to disable charts endpoint

2017-10-10 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master ebe715c56 -> 21e94c7d1


[AIRFLOW-1697] Mode to disable charts endpoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/21e94c7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/21e94c7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/21e94c7d

Branch: refs/heads/master
Commit: 21e94c7d1594c5e0806d9e1ae1205a41bf98b5d3
Parents: ebe715c
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Mon Oct 9 14:46:38 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Oct 10 11:33:50 2017 -0700

--
 UPDATING.md  | 2 ++
 airflow/config_templates/default_airflow.cfg | 4 
 airflow/www/app.py   | 7 +--
 airflow/www/views.py | 9 -
 scripts/ci/airflow_travis.cfg| 1 +
 5 files changed, 20 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/UPDATING.md
--
diff --git a/UPDATING.md b/UPDATING.md
index 6a0b8bc..ebcb5cd 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -270,6 +270,8 @@ supported and will be removed entirely in Airflow 2.0
   Previously, `Operator.__init__()` accepted any arguments (either positional 
`*args` or keyword `**kwargs`) without
   complaint. Now, invalid arguments will be rejected. 
(https://github.com/apache/incubator-airflow/pull/1285)
 
+- The config value secure_mode will default to True which will disable some 
insecure endpoints/features
+
 ### Known Issues
 There is a report that the default of "-1" for num_runs creates an issue where 
errors are reported while parsing tasks.
 It was not confirmed, but a workaround was found by changing the default back 
to `None`.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/airflow/config_templates/default_airflow.cfg
--
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index b051583..dee6dc7 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -117,6 +117,10 @@ default_impersonation =
 # What security module to use (for example kerberos):
 security =
 
+# If set to False enables some unsecure features like Charts. In 2.0 will
+# default to True.
+secure_mode = False
+
 # Turn unit test mode on (overwrites many configuration options with test
 # values at runtime)
 unit_test_mode = False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/airflow/www/app.py
--
diff --git a/airflow/www/app.py b/airflow/www/app.py
index bbb9410..dfdc04c 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -22,6 +22,7 @@ from flask_wtf.csrf import CSRFProtect
 csrf = CSRFProtect()
 
 import airflow
+from airflow import configuration as conf
 from airflow import models, LoggingMixin
 from airflow.settings import Session
 
@@ -69,8 +70,10 @@ def create_app(config=None, testing=False):
 av(vs.Airflow(name='DAGs', category='DAGs'))
 
 av(vs.QueryView(name='Ad Hoc Query', category="Data Profiling"))
-av(vs.ChartModelView(
-models.Chart, Session, name="Charts", category="Data Profiling"))
+
+if not conf.getboolean('core', 'secure_mode'):
+av(vs.ChartModelView(
+models.Chart, Session, name="Charts", category="Data 
Profiling"))
 av(vs.KnownEventView(
 models.KnownEvent,
 Session, name="Known Events", category="Data Profiling"))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/21e94c7d/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index ad27238..bc63b5b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -37,7 +37,8 @@ import sqlalchemy as sqla
 from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
-redirect, url_for, request, Markup, Response, current_app, 
render_template, make_response)
+abort, redirect, url_for, request, Markup, Response, current_app, 
render_template, 
+make_response)
 from flask_admin import BaseView, expose, AdminIndexView
 from flask_admin.contrib.sqla import ModelView
 from flask_admin.actions import action
@@ -299,6 +300,9 @@ class Airflow(BaseView):
 def chart_data(self):
 from airflow import macros

[jira] [Created] (AIRFLOW-1697) Mode to disable Airflow Charts

2017-10-09 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1697:


 Summary: Mode to disable Airflow Charts
 Key: AIRFLOW-1697
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1697
 Project: Apache Airflow
  Issue Type: Bug
  Components: ui
Reporter: Dan Davydov
Assignee: Dan Davydov


Mode to disable Airflow Charts



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1681) Create way to batch retry task instances in the CRUD

2017-10-04 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1681:


 Summary: Create way to batch retry task instances in the CRUD
 Key: AIRFLOW-1681
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1681
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov


The old way to batch retry tasks was to select them on the Task Instances page 
on the webserver and do a With Selected -> Delete.

This no longer works as you will get overlapping task instance logs (e.g. the 
first retry log will be placed in the same location as the first try log). We 
need an option in the crud called With Selected -> Retry that does the same 
thing as With Selected -> Delete but follows the logic for task clearing (sets 
state to none, increases max_tries). Once this feature is stable With Selected 
-> Delete should probably be removed as it leaders to bad states with the logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1670) Make Airflow Viewable By Color-blind Users

2017-10-02 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1670:


 Summary: Make Airflow Viewable By Color-blind Users
 Key: AIRFLOW-1670
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1670
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov
Assignee: Dan Davydov


Currently the success and failed states are hard to differentiate for Airflow 
users, RGB(204,255,204) for the success state works better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1651) Accessibility Mode

2017-09-27 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1651:


 Summary: Accessibility Mode
 Key: AIRFLOW-1651
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1651
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov


Colorblind users have a lot of trouble using Airflow (e.g. red-green colorblind 
users can't tell apart failed and successful tasks). We should have an 
accessibility mode to help them differentiate them (colors with more contrast 
and maybe different shapes).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1621] Add tests for server side paging

2017-09-19 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6e520704f -> 656d045e9


[AIRFLOW-1621] Add tests for server side paging

Adding tests to check logic included in
AIRFLOW-1519.

Closes #2614 from edgarRd/erod-ui-dags-paging-
tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/656d045e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/656d045e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/656d045e

Branch: refs/heads/master
Commit: 656d045e90bf67ca484a3778b2a07a419bfb324a
Parents: 6e52070
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Tue Sep 19 15:52:26 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Sep 19 15:52:30 2017 -0700

--
 airflow/www/utils.py| 26 
 tests/www/test_utils.py | 73 
 2 files changed, 87 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/656d045e/airflow/www/utils.py
--
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 344a4e9..96293a2 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -76,6 +76,20 @@ class DataProfilingMixin(object):
 )
 
 
+def get_params(**kwargs):
+params = []
+for k, v in kwargs.items():
+if k == 'showPaused':
+# True is default or None
+if v or v is None:
+continue
+params.append('{}={}'.format(k, v))
+elif v:
+params.append('{}={}'.format(k, v))
+params = sorted(params, key=lambda x: x.split('=')[0])
+return '&'.join(params)
+
+
 def generate_pages(current_page, num_of_pages,
search=None, showPaused=None, window=7):
 """
@@ -103,18 +117,6 @@ def generate_pages(current_page, num_of_pages,
 the HTML string of the paging component
 """
 
-def get_params(**kwargs):
-params = []
-for k, v in kwargs.items():
-if k == 'showPaused':
-# True is default or None
-if v or v is None:
-continue
-params.append('{}={}'.format(k, v))
-elif v:
-params.append('{}={}'.format(k, v))
-return '&'.join(params)
-
 void_link = 'javascript:void(0)'
 first_node = """
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/656d045e/tests/www/test_utils.py
--
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index bb0860f..d13a49d 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import unittest
+from xml.dom import minidom
 
 from airflow.www import utils
 
@@ -31,6 +32,78 @@ class UtilsTest(unittest.TestCase):
 def test_sensitive_variable_should_be_hidden_ic(self):
 self.assertTrue(utils.should_hide_value_for_key("GOOGLE_API_KEY"))
 
+def check_generate_pages_html(self, current_page, total_pages,
+  window=7, check_middle=False):
+extra_links = 4  # first, prev, next, last
+html_str = utils.generate_pages(current_page, total_pages)
+
+# dom parser has issues with special  and 
+html_str = html_str.replace('', '')
+html_str = html_str.replace('', '')
+dom = minidom.parseString(html_str)
+self.assertIsNotNone(dom)
+
+ulist = dom.getElementsByTagName('ul')[0]
+ulist_items = ulist.getElementsByTagName('li')
+self.assertEqual(min(window, total_pages) + extra_links, 
len(ulist_items))
+
+def get_text(nodelist):
+rc = []
+for node in nodelist:
+if node.nodeType == node.TEXT_NODE:
+rc.append(node.data)
+return ''.join(rc)
+
+page_items = ulist_items[2:-2]
+mid = int(len(page_items) / 2)
+for i, item in enumerate(page_items):
+a_node = item.getElementsByTagName('a')[0]
+href_link = a_node.getAttribute('href')
+node_text = get_text(a_node.childNodes)
+if node_text == str(current_page + 1):
+if check_middle:
+self.assertEqual(mid, i)
+self.assertEqual('javascript:void(0)', 
a_node.getAttribute('href'))
+self.assertIn('active', item.getAttribute('class'))
+else:
+link_str = '?page=' + str(int(node_text) - 1)
+self.assertEqual(link_str, href_link)
+
+def te

[jira] [Created] (AIRFLOW-1624) Consolidate airflow clear and airflow run

2017-09-19 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1624:


 Summary: Consolidate airflow clear and airflow run
 Key: AIRFLOW-1624
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1624
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov


Now that task clearing behaves in a non-destructive ways (creates new task 
instance runs), it makes sense to consolidate it with run in the UI.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1620) Support granular and customizable hook retries

2017-09-18 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1620:


 Summary: Support granular and customizable hook retries
 Key: AIRFLOW-1620
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1620
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov


We need some kind of abstraction to allow for granular and customized retry 
handling for hooks. For example, some users might want to catch certain types 
of exceptions, and with a customized retry policy (e.g. exponential backoff), 
depending on their use cases.

One possible solution is to use dependency injection (e.g. hooks need to be 
passed into operators).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1519] Add server side paging in DAGs list

2017-09-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6632b0ce1 -> b6d2e0a46


[AIRFLOW-1519] Add server side paging in DAGs list

Airflow's main page previously did paging client-
side via a
jQuery plugin (DataTable) which was very slow at
loading all DAGs.
The browser would load all DAGs in the table.
The result was performance degradation when having
a number of
DAGs in the range of 1K.

This commit implements server-side paging using
the webserver page
size setting, sending to the browser only the
elements for the
specific page.

Closes #2531 from edgarRd/erod-ui-dags-paging


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b6d2e0a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b6d2e0a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b6d2e0a4

Branch: refs/heads/master
Commit: b6d2e0a46978e93e16576604624f57d1388814f2
Parents: 6632b0c
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Fri Sep 15 16:41:25 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Sep 15 16:41:29 2017 -0700

--
 airflow/www/static/bootstrap3-typeahead.min.js |  21 
 airflow/www/templates/airflow/dags.html|  76 +++-
 airflow/www/utils.py   | 121 
 airflow/www/views.py   | 105 +
 licenses/LICENSE-typeahead.txt |  13 +++
 5 files changed, 308 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b6d2e0a4/airflow/www/static/bootstrap3-typeahead.min.js
--
diff --git a/airflow/www/static/bootstrap3-typeahead.min.js 
b/airflow/www/static/bootstrap3-typeahead.min.js
new file mode 100644
index 000..23aac4e
--- /dev/null
+++ b/airflow/www/static/bootstrap3-typeahead.min.js
@@ -0,0 +1,21 @@
+/* =
+ * bootstrap3-typeahead.js v4.0.2
+ * https://github.com/bassjobsen/Bootstrap-3-Typeahead
+ * =
+ * Original written by @mdo and @fat
+ * =
+ * Copyright 2014 Bass Jobsen @bassjobsen
+ *
+ * Licensed under the Apache License, Version 2.0 (the 'License');
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *  */
+!function(a,b){"use strict";"undefined"!=typeof 
module&?module.exports=b(require("jquery")):"function"==typeof 
define&?define(["jquery"],function(a){return 
b(a)}):b(a.jQuery)}(this,function(a){"use strict";var 
b=function(c,d){this.$element=a(c),this.options=a.extend({},b.defaults,d),this.matcher=this.options.matcher||this.matcher,this.sorter=this.options.sorter||this.sorter,this.select=this.options.select||this.select,this.autoSelect="boolean"!=typeof
 
this.options.autoSelect||this.options.autoSelect,this.highlighter=this.options.highlighter||this.highlighter,this.render=this.options.render||this.render,this.updater=this.options.updater||this.updater,this.displayText=this.options.displayText||this.displayText,this.source=this.options.source,this.delay=this.options.delay,this.$menu=a(this.options.menu),this.$appendTo=this.options.appendTo?a(this.options.appendTo):null,this.fitToElement="boolean"==typeof
 this.options.fitToElement&,thi
 s.shown=!1,this.listen(),this.showHintOnFocus=("boolean"==typeof 
this.options.showHintOnFocus||"all"===this.options.showHintOnFocus)&,this.afterSelect=this.options.afterSelect,this.addItem=!1,this.value=this.$element.val()||this.$element.text()};b.prototype={constructor:b,select:function(){var
 
a=this.$menu.find(".active").data("value");if(this.$element.data("active",a),this.autoSelect||a){var
 
b=this.updater(a);b||(b=""),this.$element.val(this.displayText(b)||b).text(this.displayText(b)||b).change(),this.afterSelect(b)}return
 this.hide()},updater:function(a){return 
a},setSource:function(a){this.source=a},show:function(){var 
d,b=a.extend({},this.$element.position(),{height:this.$element[0].offsetHeight}),c

[jira] [Updated] (AIRFLOW-1584) Remove the insecure /headers endpoints

2017-09-08 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1584:
-
Description: 
Impact: An XSS vulnerability on Airflow would be able to read a user's 
auth_proxy cookie, granting the attacker access to any other InternalAuth-gated 
application on Airbnb's network.

Target: The endpoint at 
https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
https://airflow-precious.d.musta.ch/admin/airflow/headers

Description: The endpoint listed in the Target section returns the headers sent 
by the user's browser, including the Cookie header. Since this endpoint can be 
called by JavaScript on the Airflow domain, this allows JS running on Airflow 
to ignore the HTTPOnly directive that the auth_proxy (and potentially other) 
cookie sets. This means that malicious JavaScript can steal the auth_proxy 
cookie and use it to authenticate to other InternalAuth services.


{code:java}
This can be demonstrated by running the following JavaScript snippet in any 
Airflow tab:
$.get("/admin/airflow/headers", function(data) 
{alert(data['headers']['Cookie']);})
{code}


Remediation: Disable this endpoint entirely. If some of the headers are 
important they can be added to the gunicorn request log format.

  was:
Impact: An XSS vulnerability on Airflow would be able to read a user's 
auth_proxy cookie, granting the attacker access to any other InternalAuth-gated 
application on Airbnb's network.

Target: The endpoint at 
https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
https://airflow-precious.d.musta.ch/admin/airflow/headers

Description: The endpoint listed in the Target section returns the headers sent 
by the user's browser, including the Cookie header. Since this endpoint can be 
called by JavaScript on the Airflow domain, this allows JS running on Airflow 
to ignore the HTTPOnly directive that the auth_proxy (and potentially other) 
cookie sets. This means that malicious JavaScript can steal the auth_proxy 
cookie and use it to authenticate to other InternalAuth services.

This can be demonstrated by running the following JavaScript snippet in any 
Airflow tab:
$.get("/admin/airflow/headers", function(data) 
{alert(data['headers']['Cookie']);})

Remediation: Disable this endpoint entirely. If some of the headers are 
important they can be added to the gunicorn request log format.


> Remove the insecure /headers endpoints
> --
>
> Key: AIRFLOW-1584
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1584
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Dan Davydov
>Assignee: Dan Davydov
>
> Impact: An XSS vulnerability on Airflow would be able to read a user's 
> auth_proxy cookie, granting the attacker access to any other 
> InternalAuth-gated application on Airbnb's network.
> Target: The endpoint at 
> https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
> https://airflow-precious.d.musta.ch/admin/airflow/headers
> Description: The endpoint listed in the Target section returns the headers 
> sent by the user's browser, including the Cookie header. Since this endpoint 
> can be called by JavaScript on the Airflow domain, this allows JS running on 
> Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially 
> other) cookie sets. This means that malicious JavaScript can steal the 
> auth_proxy cookie and use it to authenticate to other InternalAuth services.
> {code:java}
> This can be demonstrated by running the following JavaScript snippet in any 
> Airflow tab:
> $.get("/admin/airflow/headers", function(data) 
> {alert(data['headers']['Cookie']);})
> {code}
> Remediation: Disable this endpoint entirely. If some of the headers are 
> important they can be added to the gunicorn request log format.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (AIRFLOW-1584) Remove the insecure /headers endpoints

2017-09-08 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1584:
-
Description: 
Impact: An XSS vulnerability on Airflow would be able to read a user's 
auth_proxy cookie, granting the attacker access to any other InternalAuth-gated 
application on Airbnb's network.

Target: The endpoint at 
https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
https://airflow-precious.d.musta.ch/admin/airflow/headers

Description: The endpoint listed in the Target section returns the headers sent 
by the user's browser, including the Cookie header. Since this endpoint can be 
called by JavaScript on the Airflow domain, this allows JS running on Airflow 
to ignore the HTTPOnly directive that the auth_proxy (and potentially other) 
cookie sets. This means that malicious JavaScript can steal the auth_proxy 
cookie and use it to authenticate to other InternalAuth services.


This can be demonstrated by running the following JavaScript snippet in any 
Airflow tab:
{code:java}
$.get("/admin/airflow/headers", function(data) 
{alert(data['headers']['Cookie']);})
{code}


Remediation: Disable this endpoint entirely. If some of the headers are 
important they can be added to the gunicorn request log format.

  was:
Impact: An XSS vulnerability on Airflow would be able to read a user's 
auth_proxy cookie, granting the attacker access to any other InternalAuth-gated 
application on Airbnb's network.

Target: The endpoint at 
https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
https://airflow-precious.d.musta.ch/admin/airflow/headers

Description: The endpoint listed in the Target section returns the headers sent 
by the user's browser, including the Cookie header. Since this endpoint can be 
called by JavaScript on the Airflow domain, this allows JS running on Airflow 
to ignore the HTTPOnly directive that the auth_proxy (and potentially other) 
cookie sets. This means that malicious JavaScript can steal the auth_proxy 
cookie and use it to authenticate to other InternalAuth services.


{code:java}
This can be demonstrated by running the following JavaScript snippet in any 
Airflow tab:
$.get("/admin/airflow/headers", function(data) 
{alert(data['headers']['Cookie']);})
{code}


Remediation: Disable this endpoint entirely. If some of the headers are 
important they can be added to the gunicorn request log format.


> Remove the insecure /headers endpoints
> --
>
> Key: AIRFLOW-1584
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1584
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Dan Davydov
>Assignee: Dan Davydov
>
> Impact: An XSS vulnerability on Airflow would be able to read a user's 
> auth_proxy cookie, granting the attacker access to any other 
> InternalAuth-gated application on Airbnb's network.
> Target: The endpoint at 
> https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
> https://airflow-precious.d.musta.ch/admin/airflow/headers
> Description: The endpoint listed in the Target section returns the headers 
> sent by the user's browser, including the Cookie header. Since this endpoint 
> can be called by JavaScript on the Airflow domain, this allows JS running on 
> Airflow to ignore the HTTPOnly directive that the auth_proxy (and potentially 
> other) cookie sets. This means that malicious JavaScript can steal the 
> auth_proxy cookie and use it to authenticate to other InternalAuth services.
> This can be demonstrated by running the following JavaScript snippet in any 
> Airflow tab:
> {code:java}
> $.get("/admin/airflow/headers", function(data) 
> {alert(data['headers']['Cookie']);})
> {code}
> Remediation: Disable this endpoint entirely. If some of the headers are 
> important they can be added to the gunicorn request log format.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1584) Remove the insecure /headers endpoints

2017-09-08 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1584:


 Summary: Remove the insecure /headers endpoints
 Key: AIRFLOW-1584
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1584
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov
Assignee: Dan Davydov


Impact: An XSS vulnerability on Airflow would be able to read a user's 
auth_proxy cookie, granting the attacker access to any other InternalAuth-gated 
application on Airbnb's network.

Target: The endpoint at 
https://airflow-main-proxy.d.musta.ch/admin/airflow/headers or 
https://airflow-precious.d.musta.ch/admin/airflow/headers

Description: The endpoint listed in the Target section returns the headers sent 
by the user's browser, including the Cookie header. Since this endpoint can be 
called by JavaScript on the Airflow domain, this allows JS running on Airflow 
to ignore the HTTPOnly directive that the auth_proxy (and potentially other) 
cookie sets. This means that malicious JavaScript can steal the auth_proxy 
cookie and use it to authenticate to other InternalAuth services.

This can be demonstrated by running the following JavaScript snippet in any 
Airflow tab:
$.get("/admin/airflow/headers", function(data) 
{alert(data['headers']['Cookie']);})

Remediation: Disable this endpoint entirely. If some of the headers are 
important they can be added to the gunicorn request log format.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1574] add 'to' attribute to templated vars of email operator

2017-09-07 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8f1ec4dee -> 2d4069448


[AIRFLOW-1574] add 'to' attribute to templated vars of email operator

The to field may sometimes want to be to be
template-able when you have a DAG that is using
XCOM to find the user to send the information to
(i.e. we have a form that a user submits and based
on the ldap user we send this specific user the
information). It's a rather easy fix to add the
'to' user to the template-able options.

Closes #2577 from Acehaidrey/AIRFLOW-1574


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d406944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d406944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d406944

Branch: refs/heads/master
Commit: 2d40694482df7c9c1c02a61e11d51bedb9034a93
Parents: 8f1ec4d
Author: Ace Haidrey <ahaid...@pandora.com>
Authored: Thu Sep 7 11:53:16 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Sep 7 11:53:20 2017 -0700

--
 airflow/operators/email_operator.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d406944/airflow/operators/email_operator.py
--
diff --git a/airflow/operators/email_operator.py 
b/airflow/operators/email_operator.py
index 5167a7a..6eaae95 100644
--- a/airflow/operators/email_operator.py
+++ b/airflow/operators/email_operator.py
@@ -36,7 +36,7 @@ class EmailOperator(BaseOperator):
 :type bcc: list or string (comma or semicolon delimited)
 """
 
-template_fields = ('subject', 'html_content')
+template_fields = ('to', 'subject', 'html_content')
 template_ext = ('.html',)
 ui_color = '#e6faf9'
 



incubator-airflow git commit: [AIRFLOW-1541] Add channel to template fields of slack_operator

2017-08-30 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b1f902e63 -> 9450d8db6


[AIRFLOW-1541] Add channel to template fields of slack_operator

Closes #2549 from Acehaidrey/AIRFLOW-1541


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9450d8db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9450d8db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9450d8db

Branch: refs/heads/master
Commit: 9450d8db6f005d3c5ad84c23c484bbb119e112a8
Parents: b1f902e
Author: Ace Haidrey <ahaid...@pandora.com>
Authored: Wed Aug 30 11:52:47 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Aug 30 11:52:48 2017 -0700

--
 airflow/operators/slack_operator.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9450d8db/airflow/operators/slack_operator.py
--
diff --git a/airflow/operators/slack_operator.py 
b/airflow/operators/slack_operator.py
index 2e6d426..86659d9 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -86,7 +86,7 @@ class SlackAPIPostOperator(SlackAPIOperator):
 :type attachments: array of hashes
 """
 
-template_fields = ('username', 'text', 'attachments')
+template_fields = ('username', 'text', 'attachments', 'channel')
 ui_color = '#FFBA40'
 
 @apply_defaults



[jira] [Commented] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs

2017-08-15 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128134#comment-16128134
 ] 

Dan Davydov commented on AIRFLOW-1311:
--

Edgar assigning to you, feel free to mark as dupe or merge if you have an 
existing JIRA ticket.

> Improve Webserver Load Time For Large DAGs
> --
>
> Key: AIRFLOW-1311
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1311
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>    Reporter: Dan Davydov
>Assignee: Edgar Rodriguez
>
> Large DAGs can take an extremely long time to load in the Airflow UI 
> (minutes/timeout).
> The fixes are as follows:
> 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing 
> tasks by their depth, and allow users to expand sections for these DAGs, 
> ideally prefetch deeper tasks once the initial set of tasks has rendered )
> 2. Identify bottlenecks/performance issues in both the frontend/backend for 
> rendering DAGs on the webserver and fix them. Airflow should be more 
> performant for displaying DAGs that are somewhat large, e.g. DAGs that have 
> up to 500 nodes and 2000 edges (dependencies from one task to another) should 
> render within a couple of seconds.
> 3. Make DAG loading asynchronous in the UI (once the top-level tasks have 
> loaded display them immediately). We might not want to do this as users might 
> try to click something only to have the UI change from underneath them
> [~saguziel]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (AIRFLOW-1266) Long task names are truncated in gannt view

2017-08-15 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov resolved AIRFLOW-1266.
--
Resolution: Fixed

> Long task names are truncated in gannt view
> ---
>
> Key: AIRFLOW-1266
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1266
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>    Reporter: Dan Davydov
> Fix For: 1.8.2
>
>
> Long task names are truncated in gannt view, we should make the text smaller 
> instead. To reproduce create a task called 
> abczzzdef and note that the start/end characters 
> are not fully visible in the gant view.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs

2017-08-15 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov reassigned AIRFLOW-1311:


Assignee: Edgar Rodriguez  (was: Dan Davydov)

> Improve Webserver Load Time For Large DAGs
> --
>
> Key: AIRFLOW-1311
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1311
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>    Reporter: Dan Davydov
>Assignee: Edgar Rodriguez
>
> Large DAGs can take an extremely long time to load in the Airflow UI 
> (minutes/timeout).
> The fixes are as follows:
> 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing 
> tasks by their depth, and allow users to expand sections for these DAGs, 
> ideally prefetch deeper tasks once the initial set of tasks has rendered )
> 2. Identify bottlenecks/performance issues in both the frontend/backend for 
> rendering DAGs on the webserver and fix them. Airflow should be more 
> performant for displaying DAGs that are somewhat large, e.g. DAGs that have 
> up to 500 nodes and 2000 edges (dependencies from one task to another) should 
> render within a couple of seconds.
> 3. Make DAG loading asynchronous in the UI (once the top-level tasks have 
> loaded display them immediately). We might not want to do this as users might 
> try to click something only to have the UI change from underneath them
> [~saguziel]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs

2017-08-15 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov reassigned AIRFLOW-1311:


Assignee: Dan Davydov

> Improve Webserver Load Time For Large DAGs
> --
>
> Key: AIRFLOW-1311
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1311
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>    Reporter: Dan Davydov
>    Assignee: Dan Davydov
>
> Large DAGs can take an extremely long time to load in the Airflow UI 
> (minutes/timeout).
> The fixes are as follows:
> 1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing 
> tasks by their depth, and allow users to expand sections for these DAGs, 
> ideally prefetch deeper tasks once the initial set of tasks has rendered )
> 2. Identify bottlenecks/performance issues in both the frontend/backend for 
> rendering DAGs on the webserver and fix them. Airflow should be more 
> performant for displaying DAGs that are somewhat large, e.g. DAGs that have 
> up to 500 nodes and 2000 edges (dependencies from one task to another) should 
> render within a couple of seconds.
> 3. Make DAG loading asynchronous in the UI (once the top-level tasks have 
> loaded display them immediately). We might not want to do this as users might 
> try to click something only to have the UI change from underneath them
> [~saguziel]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (AIRFLOW-1442) Airflow Ignore All Deps Command Generation Has an Extra Space

2017-08-15 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov resolved AIRFLOW-1442.
--
Resolution: Fixed

> Airflow Ignore All Deps Command Generation Has an Extra Space
> -
>
> Key: AIRFLOW-1442
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1442
> Project: Apache Airflow
>  Issue Type: Bug
>        Reporter: Dan Davydov
>    Assignee: Dan Davydov
>
> Airflow Ignore All Deps Command Generation Has an Extra Space. This causes 
> airflow commands e.g. generated by the webserver to fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1495] Fix migration on index on job_id

2017-08-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 04bfba3aa -> 67b47c958


[AIRFLOW-1495] Fix migration on index on job_id

There was a merge conflict on the migration hash
for down revision
at the time that two commits including migrations
were merged.

This commit restores the chain of revisions for
the migrations,
pointing to the last one. The job_id index
migration was regenerated
from the top migration.

Closes #2524 from edgarRd/erod-ti-jobid-index-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67b47c95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67b47c95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67b47c95

Branch: refs/heads/master
Commit: 67b47c958903a2297916b44e97adc289d6184b5a
Parents: 04bfba3
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Tue Aug 15 15:27:04 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Aug 15 15:27:06 2017 -0700

--
 .../7171349d4c73_add_ti_job_id_index.py | 38 
 .../947454bf1dff_add_ti_job_id_index.py | 38 
 2 files changed, 38 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b47c95/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py
--
diff --git a/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py 
b/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py
deleted file mode 100644
index b7e2be6..000
--- a/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""add ti job_id index
-
-Revision ID: 7171349d4c73
-Revises: cc1e65623dc7
-Create Date: 2017-08-14 18:08:50.196042
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = '7171349d4c73'
-down_revision = 'cc1e65623dc7'
-branch_labels = None
-depends_on = None
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
-op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False)
-
-
-def downgrade():
-op.drop_index('ti_job_id', table_name='task_instance')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b47c95/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
--
diff --git a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py 
b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
new file mode 100644
index 000..b0817c3
--- /dev/null
+++ b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add ti job_id index
+
+Revision ID: 947454bf1dff
+Revises: bdaa763e6c56
+Create Date: 2017-08-15 15:12:13.845074
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '947454bf1dff'
+down_revision = 'bdaa763e6c56'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False)
+
+
+def downgrade():
+op.drop_index('ti_job_id', table_name='task_instance')



incubator-airflow git commit: [AIRFLOW-1483] Making page size consistent in list

2017-08-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e1772c008 -> 04bfba3aa


[AIRFLOW-1483] Making page size consistent in list

Views showing model listings had large page sizes
which made page
loading really slow client-side, mostly due to DOM
processing and
JS plugin rendering.
Also, the page size was inconsistent across some
listings.

This commit introduces a configurable page size,
and by default
it'll use a page_size = 100. Also, the same page
size is applied to
all the model views controlled by flask_admin to
be consistent.

Closes #2497 from edgarRd/erod-ui-page-size-conf


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/04bfba3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/04bfba3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/04bfba3a

Branch: refs/heads/master
Commit: 04bfba3aa97deab850c14763279d33a6dfceb205
Parents: e1772c0
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Tue Aug 15 15:01:17 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Aug 15 15:01:19 2017 -0700

--
 airflow/config_templates/default_airflow.cfg | 3 +++
 airflow/config_templates/default_test.cfg| 1 +
 airflow/www/views.py | 9 +
 3 files changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/04bfba3a/airflow/config_templates/default_airflow.cfg
--
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index b568d3a..948c72c 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -233,6 +233,9 @@ log_fetch_timeout_sec = 5
 # DAGs by default
 hide_paused_dags_by_default = False
 
+# Consistent page size across all listing views in the UI
+page_size = 100
+
 [email]
 email_backend = airflow.utils.email.send_email_smtp
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/04bfba3a/airflow/config_templates/default_test.cfg
--
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 4452ffa..2a090d4 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -57,6 +57,7 @@ dag_orientation = LR
 dag_default_view = tree
 log_fetch_timeout_sec = 5
 hide_paused_dags_by_default = False
+page_size = 100
 
 [email]
 email_backend = airflow.utils.email.send_email_smtp

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/04bfba3a/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f813a0b..80b9dd3 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -91,6 +91,8 @@ logout_user = airflow.login.logout_user
 
 FILTER_BY_OWNER = False
 
+PAGE_SIZE = conf.getint('webserver', 'page_size')
+
 if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
 # filter_by_owner if authentication is enabled and filter_by_owner is true
 FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
@@ -1966,7 +1968,7 @@ class AirflowModelView(ModelView):
 edit_template = 'airflow/model_edit.html'
 create_template = 'airflow/model_create.html'
 column_display_actions = True
-page_size = 500
+page_size = PAGE_SIZE
 
 
 class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView):
@@ -2283,7 +2285,6 @@ class VariableView(wwwutils.DataProfilingMixin, 
AirflowModelView):
 class XComView(wwwutils.SuperUserMixin, AirflowModelView):
 verbose_name = "XCom"
 verbose_name_plural = "XComs"
-page_size = 20
 
 form_columns = (
 'key',
@@ -2438,7 +2439,7 @@ class TaskInstanceModelView(ModelViewOnly):
 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
 'pool', 'log_url')
 can_delete = True
-page_size = 500
+page_size = PAGE_SIZE
 
 @action('set_running', "Set state to 'running'", None)
 def action_set_running(self, ids):
@@ -2704,7 +2705,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
 )
 can_delete = False
 can_create = False
-page_size = 50
+page_size = PAGE_SIZE
 list_template = 'airflow/list_dags.html'
 named_filter_urls = True
 



incubator-airflow git commit: [AIRFLOW-1495] Add TaskInstance index on job_id

2017-08-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4cf904cf5 -> e1772c008


[AIRFLOW-1495] Add TaskInstance index on job_id

Column job_id is unindexed in TaskInstance, it was
used as
default sort column in TaskInstanceView.

This commit adds the required migration to add the
index on
task_instance.job_id on future db upgrades.

Closes #2520 from edgarRd/erod-ti-jobid-index


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e1772c00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e1772c00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e1772c00

Branch: refs/heads/master
Commit: e1772c008d607a2545ddaa05508b1a74473be0ec
Parents: 4cf904c
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Tue Aug 15 14:57:26 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Aug 15 14:57:28 2017 -0700

--
 .../7171349d4c73_add_ti_job_id_index.py | 38 
 1 file changed, 38 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1772c00/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py
--
diff --git a/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py 
b/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py
new file mode 100644
index 000..b7e2be6
--- /dev/null
+++ b/airflow/migrations/versions/7171349d4c73_add_ti_job_id_index.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add ti job_id index
+
+Revision ID: 7171349d4c73
+Revises: cc1e65623dc7
+Create Date: 2017-08-14 18:08:50.196042
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '7171349d4c73'
+down_revision = 'cc1e65623dc7'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False)
+
+
+def downgrade():
+op.drop_index('ti_job_id', table_name='task_instance')



[jira] [Created] (AIRFLOW-1513) Come up with an abstraction to set defaults per task

2017-08-15 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1513:


 Summary: Come up with an abstraction to set defaults per task
 Key: AIRFLOW-1513
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1513
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov


Come up with an abstraction to set defaults per task, e.g. all HiveOperator 
tasks will have some specific retry policy that can be set globally by Airflow 
users.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-855] Replace PickleType with LargeBinary in XCom

2017-08-15 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 984a87c0c -> 4cf904cf5


[AIRFLOW-855] Replace PickleType with LargeBinary in XCom

PickleType in Xcom allows remote code execution.
In order to deprecate
it without changing mysql table schema, change
PickleType to LargeBinary
 because they both maps to blob type in mysql. Add
"enable_pickling" to
function signature to control using ether pickle
type or JSON. "enable_pickling"
 should also be added to core section of
airflow.cfg

Picked up where https://github.com/apache
/incubator-airflow/pull/2132 left off. Took this
PR, fixed merge conflicts, added
documentation/tests, fixed broken tests/operators,
and fixed the python3 issues.

Closes #2518 from aoen/disable-pickle-type


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cf904cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cf904cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cf904cf

Branch: refs/heads/master
Commit: 4cf904cf5a7a070bbeaf3a0e985ed2b840276015
Parents: 984a87c
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Tue Aug 15 12:24:02 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Aug 15 12:24:07 2017 -0700

--
 UPDATING.md |   6 +
 airflow/config_templates/default_airflow.cfg|   4 +
 airflow/config_templates/default_test.cfg   |   1 +
 airflow/contrib/operators/ssh_operator.py   |  10 +-
 ...c56_make_xcom_value_column_a_large_binary.py |  45 
 airflow/models.py   | 110 ++-
 tests/contrib/operators/test_sftp_operator.py   |  84 +-
 tests/contrib/operators/test_ssh_operator.py|  24 +++-
 tests/models.py |  87 +++
 9 files changed, 338 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/UPDATING.md
--
diff --git a/UPDATING.md b/UPDATING.md
index 3a880ab..92ee4b4 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -32,6 +32,12 @@ supported and will be removed entirely in Airflow 2.0
 
 - `contrib.hooks.gcp_dataflow_hook.DataFlowHook` starts to use 
`--runner=DataflowRunner` instead of `DataflowPipelineRunner`, which is removed 
from the package `google-cloud-dataflow-0.6.0`.
 
+- The pickle type for XCom messages has been replaced by json to prevent RCE 
attacks.
+  Note that JSON serialization is stricter than pickling, so if you want to 
e.g. pass
+  raw bytes through XCom you must encode them using an encoding like base64.
+  By default pickling is still enabled until Airflow 2.0. To disable it 
+  Set enable_xcom_pickling = False in your Airflow config.
+
 ## Airflow 1.8.1
 
 The Airflow package name was changed from `airflow` to `apache-airflow` during 
this release. You must uninstall your

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/config_templates/default_airflow.cfg
--
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index dcb99ed..b568d3a 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -129,6 +129,10 @@ logging_config_path =
 # Default to use file task handler.
 task_log_reader = file.task
 
+# Whether to enable pickling for xcom (note that this is insecure and allows 
for
+# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).
+enable_xcom_pickling = True
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/config_templates/default_test.cfg
--
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 88b19a5..4452ffa 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -37,6 +37,7 @@ dag_concurrency = 16
 dags_are_paused_at_creation = False
 fernet_key = {FERNET_KEY}
 non_pooled_task_slot_count = 128
+enable_xcom_pickling = False
 
 [cli]
 api_client = airflow.api.client.local_client

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/contrib/operators/ssh_operator.py
--
diff --git a/airflow/contrib/operators/ssh_operator.py 
b/airflow/contrib/operators/ssh_operat

incubator-airflow git commit: [AIRFLOW-1239] Fix unicode error for logs in base_task_runner

2017-08-14 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 565423a39 -> 42cad6069


[AIRFLOW-1239] Fix unicode error for logs in base_task_runner

The details here are that there exists a PR for
this JIRA already (https://github.com/apache/incubator-
airflow/pull/2318). The issue is that in python 2.7 not
all literals are automatically unicode like they
are in python 3. That's what's the root cause, and
that can simply be fixed by just explicitly
stating all literals should be treated as unicode,
which is an import from the `__future__` module.
https://stackoverflow.com/questions/3235386/python-using-format-on-
a-unicode-escaped-string also explains this
same solution, which I found helpful.

Closes #2496 from Acehaidrey/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/42cad606
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/42cad606
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/42cad606

Branch: refs/heads/master
Commit: 42cad60698646f617537a7f4ba713fd6b3fc2ecb
Parents: 565423a
Author: Ace Haidrey <ahaid...@pandora.com>
Authored: Mon Aug 14 15:13:01 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Aug 14 15:13:02 2017 -0700

--
 airflow/task_runner/base_task_runner.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/42cad606/airflow/task_runner/base_task_runner.py
--
diff --git a/airflow/task_runner/base_task_runner.py 
b/airflow/task_runner/base_task_runner.py
index 7229be5..bed8eaa 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from __future__ import unicode_literals
 
 import getpass
 import os



incubator-airflow git commit: [AIRFLOW-1452] workaround lock on method

2017-08-11 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b0669b532 -> 0d0cc62f4


[AIRFLOW-1452] workaround lock on method

Workaround lock on method "has_table" in case
mssql is used
as storage engine.

Closes #2514 from patsak/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0d0cc62f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0d0cc62f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0d0cc62f

Branch: refs/heads/master
Commit: 0d0cc62f49525166bc877606affa5a623ba52c4d
Parents: b0669b5
Author: k.privezentsev <konstantin.privezent...@kaspersky.com>
Authored: Fri Aug 11 11:47:35 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Aug 11 11:47:42 2017 -0700

--
 .../cc1e65623dc7_add_max_tries_column_to_task_instance.py   | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0d0cc62f/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
--
diff --git 
a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
 
b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
index 2d5ffc2..b151e0c 100644
--- 
a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
+++ 
b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
@@ -29,6 +29,7 @@ from alembic import op
 import sqlalchemy as sa
 from airflow import settings
 from airflow.models import DagBag, TaskInstance
+from sqlalchemy.engine.reflection import Inspector
 
 BATCH_SIZE = 5000
 
@@ -39,10 +40,12 @@ def upgrade():
 # needed for database that does not create table until migration finishes.
 # Checking task_instance table exists prevent the error of querying
 # non-existing task_instance table.
-engine = settings.engine
-if engine.dialect.has_table(engine, 'task_instance'):
+connection = op.get_bind()
+inspector = Inspector.from_engine(connection)
+tables = inspector.get_table_names()
+
+if 'task_instance' in tables:
 # Get current session
-connection = op.get_bind()
 sessionmaker = sa.orm.sessionmaker()
 session = sessionmaker(bind=connection)
 dagbag = DagBag(settings.DAGS_FOLDER)



incubator-airflow git commit: [AIRFLOW-1385] Make Airflow task logging configurable

2017-08-11 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0bc248fc7 -> b0669b532


[AIRFLOW-1385] Make Airflow task logging configurable

This PR adds configurable task logging to Airflow.
Please refer to #2422 for previous discussions.
This is the first step of making entire Airflow
logging configurable ([AIRFLOW-1454](https://issue
s.apache.org/jira/browse/AIRFLOW-1454)).

Closes #2464 from AllisonWang/allison--log-
abstraction


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0669b53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0669b53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0669b53

Branch: refs/heads/master
Commit: b0669b532a7be9aa34a4390951deaa25897c62e6
Parents: 0bc248f
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Fri Aug 11 11:38:37 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Aug 11 11:38:39 2017 -0700

--
 airflow/bin/cli.py  | 106 ++---
 airflow/config_templates/__init__.py|  13 ++
 airflow/config_templates/default_airflow.cfg|   7 +
 .../config_templates/default_airflow_logging.py |  94 
 airflow/dag/__init__.py |   1 -
 airflow/settings.py |  14 ++
 airflow/task_runner/base_task_runner.py |   2 +
 airflow/utils/log/__init__.py   |  13 ++
 airflow/utils/log/file_task_handler.py  | 176 +++
 airflow/utils/log/gcs_task_handler.py   |  95 
 airflow/utils/log/s3_task_handler.py|  91 
 airflow/utils/logging.py|  47 +---
 airflow/www/views.py| 225 +++
 tests/utils/test_log_handlers.py|  73 ++
 tests/utils/test_logging.py |  17 +-
 15 files changed, 687 insertions(+), 287 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 077cb90..e9e60cb 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,7 +22,6 @@ import os
 import socket
 import subprocess
 import textwrap
-import warnings
 from importlib import import_module
 
 import argparse
@@ -54,8 +53,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils import logging as logging_utils
-from airflow.utils.file import mkdirs
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -357,61 +354,22 @@ def run(args, dag=None):
 ti = TaskInstance(task, args.execution_date)
 ti.refresh_from_db()
 
-logging.root.handlers = []
+logger = logging.getLogger('airflow.task')
 if args.raw:
-# Output to STDOUT for the parent process to read and log
-logging.basicConfig(
-stream=sys.stdout,
-level=settings.LOGGING_LEVEL,
-format=settings.LOG_FORMAT)
-else:
-# Setting up logging to a file.
-
-# To handle log writing when tasks are impersonated, the log files 
need to
-# be writable by the user that runs the Airflow command and the user
-# that is impersonated. This is mainly to handle corner cases with the
-# SubDagOperator. When the SubDagOperator is run, all of the operators
-# run under the impersonated user and create appropriate log files
-# as the impersonated user. However, if the user manually runs tasks
-# of the SubDagOperator through the UI, then the log files are created
-# by the user that runs the Airflow command. For example, the Airflow
-# run command may be run by the `airflow_sudoable` user, but the 
Airflow
-# tasks may be run by the `airflow` user. If the log files are not
-# writable by both users, then it's possible that re-running a task
-# via the UI (or vice versa) results in a permission error as the task
-# tries to write to a log file created by the other user.
-try_number = ti.try_number
-log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
-log_relative_dir = logging_utils.get_log_directory(args.dag_id, 
args.task_id,
-   args.execution_date)
-directory = os.path.join(log_base, log_relative_dir)
-# Create the log file and give it group writable permissions
-# TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
-   

[jira] [Resolved] (AIRFLOW-1349) Max active dagrun check for backfills shouldn't include the backfilled dagrun

2017-08-09 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov resolved AIRFLOW-1349.
--
Resolution: Fixed

> Max active dagrun check for backfills shouldn't include the backfilled dagrun
> -
>
> Key: AIRFLOW-1349
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1349
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill
>    Reporter: Dan Davydov
>Assignee: Edgar Rodriguez
>
> When you backfill a dag with e.g. 1 max active dagrun, if that dagrun is 
> already running then it shouldn't count against the max active dagruns of the 
> backfill and make the backfill fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1443] Update Airflow configuration documentation

2017-08-09 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d9109d645 -> 6825d97b8


[AIRFLOW-1443] Update Airflow configuration documentation

This PR updates Airflow configuration
documentations to include a recent change to split
task logs by try number #2383.

Closes #2467 from AllisonWang/allison--update-doc


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6825d97b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6825d97b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6825d97b

Branch: refs/heads/master
Commit: 6825d97b82a3b235685ea8265380a20eea90c990
Parents: d9109d6
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Wed Aug 9 14:49:54 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Aug 9 14:49:56 2017 -0700

--
 UPDATING.md| 29 -
 docs/configuration.rst | 15 ---
 2 files changed, 24 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6825d97b/UPDATING.md
--
diff --git a/UPDATING.md b/UPDATING.md
index a02ff04..3a880ab 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -9,8 +9,11 @@ assists people when migrating to a new version.
   SSH Hook now uses Paramiko library to create ssh client connection, instead 
of sub-process based ssh command execution previously (<1.9.0), so this is 
backward incompatible.
   - update SSHHook constructor
   - use SSHOperator class in place of SSHExecuteOperator which is removed now. 
Refer test_ssh_operator.py for usage info.
-  - SFTPOperator is added to perform secure file transfer from serverA to 
serverB. Refer test_sftp_operator.py.py for usage info. 
-  - No updates are required if you are using ftpHook, it will continue work as 
is. 
+  - SFTPOperator is added to perform secure file transfer from serverA to 
serverB. Refer test_sftp_operator.py.py for usage info.
+  - No updates are required if you are using ftpHook, it will continue work as 
is.
+
+### Logging update
+  Logs now are stored in the log folder as 
``{dag_id}/{task_id}/{execution_date}/{try_number}.log``.
 
 ### New Features
 
@@ -61,8 +64,8 @@ interfere.
 Please read through these options, defaults have changed since 1.7.1.
 
  child_process_log_directory
-In order the increase the robustness of the scheduler, DAGS our now processed 
in their own process. Therefore each 
-DAG has its own log file for the scheduler. These are placed in 
`child_process_log_directory` which defaults to 
+In order the increase the robustness of the scheduler, DAGS our now processed 
in their own process. Therefore each
+DAG has its own log file for the scheduler. These are placed in 
`child_process_log_directory` which defaults to
 `/scheduler/latest`. You will need to make sure these log files 
are removed.
 
 > DAG logs or processor logs ignore and command line settings for log file 
 > locations.
@@ -72,7 +75,7 @@ Previously the command line option `num_runs` was used to let 
the scheduler term
 loops. This is now time bound and defaults to `-1`, which means run 
continuously. See also num_runs.
 
  num_runs
-Previously `num_runs` was used to let the scheduler terminate after a certain 
amount of loops. Now num_runs specifies 
+Previously `num_runs` was used to let the scheduler terminate after a certain 
amount of loops. Now num_runs specifies
 the number of times to try to schedule each DAG file within `run_duration` 
time. Defaults to `-1`, which means try
 indefinitely. This is only available on the command line.
 
@@ -85,7 +88,7 @@ dags are not being picked up, have a look at this number and 
decrease it when ne
 
  catchup_by_default
 By default the scheduler will fill any missing interval DAG Runs between the 
last execution date and the current date.
-This setting changes that behavior to only execute the latest interval. This 
can also be specified per DAG as 
+This setting changes that behavior to only execute the latest interval. This 
can also be specified per DAG as
 `catchup = False / True`. Command line backfills will still work.
 
 ### Faulty Dags do not show an error in the Web UI
@@ -109,33 +112,33 @@ convenience variables to the config. In case your run a 
sceure Hadoop setup it m
 required to whitelist these variables by adding the following to your 
configuration:
 
 ```
- 
+
  hive.security.authorization.sqlstd.confwhitelist.append
  airflow\.ctx\..*
 
 ```
 ### Google Cloud Operator and Hook alignment
 
-All Google Cloud Operators and Hooks are aligned and use the same client 
library. Now you have a single connection 
+All Google Cloud Operators and Hooks are aligned and use the s

[jira] [Closed] (AIRFLOW-672) Allow logs to be piped into another process

2017-08-09 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov closed AIRFLOW-672.
---
Resolution: Fixed

Not really relevant with the new logging abstraction anymore. We can create a 
piping to a file logging handler if we want now.

> Allow logs to be piped into another process
> ---
>
> Key: AIRFLOW-672
> URL: https://issues.apache.org/jira/browse/AIRFLOW-672
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: logging
>    Reporter: Dan Davydov
>
> Instead of writing logs to a file Airflow should be able to pipe them to a 
> process similar to how Apache Webserver does it: 
> https://httpd.apache.org/docs/1.3/logs.html#piped . The most important reason 
> for this is to allow log rotation of tasks which can get quite large on the 
> local worker disks. Part of this task could be moving the existing s3 log 
> exporting logic (and other custom logging logic) out of the airflow core.
> FWIW I have the code ready to add log rotation to Airflow itself, but this 
> isn't as ideal as the piping solution (separation of concerns). If there is 
> interest in getting this merged as a temporary measure I can do so.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1486] Unexpected S3 writing log error

2017-08-07 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master f5c845739 -> d9109d645


[AIRFLOW-1486] Unexpected S3 writing log error

Removed unexpected S3 writing log error and added tests for s3 logging.

Closes #2499 from skudriashev/airflow-1486


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9109d64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9109d64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9109d64

Branch: refs/heads/master
Commit: d9109d6458d136cd2b76ef7180be498ae09b3ea3
Parents: f5c8457
Author: Stanislav Kudriashev <stas.kudrias...@gmail.com>
Authored: Mon Aug 7 15:52:51 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Aug 7 15:52:54 2017 -0700

--
 airflow/utils/logging.py|  40 +++
 tests/utils/test_logging.py | 107 +++
 2 files changed, 117 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/airflow/utils/logging.py
--
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index b86d839..6e18e52 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -91,10 +91,13 @@ class S3Log(object):
 except:
 pass
 
-# raise/return error if we get here
-err = 'Could not read logs from {}'.format(remote_log_location)
-logging.error(err)
-return err if return_error else ''
+# return error if needed
+if return_error:
+msg = 'Could not read logs from {}'.format(remote_log_location)
+logging.error(msg)
+return msg
+
+return ''
 
 def write(self, log, remote_log_location, append=True):
 """
@@ -108,25 +111,21 @@ class S3Log(object):
 :param append: if False, any existing log file is overwritten. If True,
 the new log is appended to any existing logs.
 :type append: bool
-
 """
 if self.hook:
-
 if append:
 old_log = self.read(remote_log_location)
-log = old_log + '\n' + log
+log = '\n'.join([old_log, log])
+
 try:
 self.hook.load_string(
 log,
 key=remote_log_location,
 replace=True,
-encrypt=configuration.getboolean('core', 
'ENCRYPT_S3_LOGS'))
-return
+encrypt=configuration.getboolean('core', 
'ENCRYPT_S3_LOGS'),
+)
 except:
-pass
-
-# raise/return error if we get here
-logging.error('Could not write logs to {}'.format(remote_log_location))
+logging.error('Could not write logs to 
{}'.format(remote_log_location))
 
 
 class GCSLog(object):
@@ -183,10 +182,13 @@ class GCSLog(object):
 except:
 pass
 
-# raise/return error if we get here
-err = 'Could not read logs from {}'.format(remote_log_location)
-logging.error(err)
-return err if return_error else ''
+# return error if needed
+if return_error:
+msg = 'Could not read logs from {}'.format(remote_log_location)
+logging.error(msg)
+return msg
+
+return ''
 
 def write(self, log, remote_log_location, append=True):
 """
@@ -200,12 +202,11 @@ class GCSLog(object):
 :param append: if False, any existing log file is overwritten. If True,
 the new log is appended to any existing logs.
 :type append: bool
-
 """
 if self.hook:
 if append:
 old_log = self.read(remote_log_location)
-log = old_log + '\n' + log
+log = '\n'.join([old_log, log])
 
 try:
 bkt, blob = self.parse_gcs_url(remote_log_location)
@@ -218,7 +219,6 @@ class GCSLog(object):
 tmpfile.flush()
 self.hook.upload(bkt, blob, tmpfile.name)
 except:
-# raise/return error if we get here
 logging.error('Could not write logs to 
{}'.format(remote_log_location))
 
 def parse_gcs_url(self, gsurl):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/tests/utils/test_logging.py
--
diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py
index 474430f..3f9f5d6 100644
--- a/tests/utils/test_logging.py
+++ b/tests/utils/test_logging.p

incubator-airflow git commit: [AIRFLOW-1487] Added links to all companies officially using Airflow

2017-08-07 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 111ce574c -> f5c845739


[AIRFLOW-1487] Added links to all companies officially using Airflow

Closes #2458 from tanmaythakur/patch-1


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f5c84573
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f5c84573
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f5c84573

Branch: refs/heads/master
Commit: f5c845739c8ca69364958dd696603879d87b5b58
Parents: 111ce57
Author: Tanmay <tanmaythak...@gmail.com>
Authored: Mon Aug 7 11:39:10 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Aug 7 11:39:36 2017 -0700

--
 README.md | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f5c84573/README.md
--
diff --git a/README.md b/README.md
index a6a048d..3c85942 100644
--- a/README.md
+++ b/README.md
@@ -94,7 +94,7 @@ Currently **officially** using Airflow:
 1. [Bellhops](https://github.com/bellhops)
 1. [BlaBlaCar](https://www.blablacar.com) 
[[@puckel](https://github.com/puckel) & [@wmorin](https://github.com/wmorin)]
 1. [Bloc](https://www.bloc.io) [[@dpaola2](https://github.com/dpaola2)]
-1. BlueApron [[@jasonjho](https://github.com/jasonjho) & 
[@matthewdavidhauser](https://github.com/matthewdavidhauser)]
+1. [BlueApron](https://www.blueapron.com) 
[[@jasonjho](https://github.com/jasonjho) & 
[@matthewdavidhauser](https://github.com/matthewdavidhauser)]
 1. [Blue Yonder](http://www.blue-yonder.com) 
[[@blue-yonder](https://github.com/blue-yonder)]
 1. [Celect](http://www.celect.com) [[@superdosh](https://github.com/superdosh) 
& [@chadcelect](https://github.com/chadcelect)]
 1. [Change.org](https://www.change.org) [[@change](https://github.com/change), 
[@vijaykramesh](https://github.com/vijaykramesh)]
@@ -103,7 +103,7 @@ Currently **officially** using Airflow:
 1. [City of San Diego](http://sandiego.gov) 
[[@MrMaksimize](https://github.com/mrmaksimize), 
[@andrell81](https://github.com/andrell81) & 
[@arnaudvedy](https://github.com/arnaudvedy)]
 1. [Clairvoyant](https://clairvoyantsoft.com) 
[@shekharv](https://github.com/shekharv)
 1. [Clover Health](https://www.cloverhealth.com) 
[[@gwax](https://github.com/gwax) & 
[@vansivallab](https://github.com/vansivallab)]
-1. Chartboost [[@cgelman](https://github.com/cgelman) & 
[@dclubb](https://github.com/dclubb)]
+1. [Chartboost](https://www.chartboost.com) 
[[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)]
 1. [Cotap](https://github.com/cotap/) [[@maraca](https://github.com/maraca) & 
[@richardchew](https://github.com/richardchew)]
 1. [Credit Karma](https://www.creditkarma.com/) 
[[@preete-dixit-ck](https://github.com/preete-dixit-ck) & 
[@harish-gaggar-ck](https://github.com/harish-gaggar-ck) & 
[@greg-finley-ck](https://github.com/greg-finley-ck)]
 1. [Digital First Media](http://www.digitalfirstmedia.com/) 
[[@duffn](https://github.com/duffn) & [@mschmo](https://github.com/mschmo) & 
[@seanmuth](https://github.com/seanmuth)]
@@ -164,7 +164,7 @@ Currently **officially** using Airflow:
 1. [SmartNews](https://www.smartnews.com/) [[@takus](https://github.com/takus)]
 1. [Spotify](https://github.com/spotify) 
[[@znichols](https://github.com/znichols)]
 1. [Stackspace](https://beta.stackspace.io/)
-1. Stripe [[@jbalogh](https://github.com/jbalogh)]
+1. [Stripe](https://stripe.com) [[@jbalogh](https://github.com/jbalogh)]
 1. [Tails.com](https://tails.com/) 
[[@alanmcruickshank](https://github.com/alanmcruickshank)]
 1. [Thumbtack](https://www.thumbtack.com/) 
[[@natekupp](https://github.com/natekupp)]
 1. [Tictail](https://tictail.com/)
@@ -176,9 +176,9 @@ Currently **officially** using Airflow:
 1. [WeTransfer](https://github.com/WeTransfer) 
[[@jochem](https://github.com/jochem)]
 1. [Whistle Labs](http://www.whistle.com) 
[[@ananya77041](https://github.com/ananya77041)]
 1. [WiseBanyan](https://wisebanyan.com/)
-1. Wooga
-1. Xoom [[@gepser](https://github.com/gepser) & 
[@omarvides](https://github.com/omarvides)]
-1. Yahoo!
+1. [Wooga](https://www.wooga.com/)
+1. [Xoom](https://www.xoom.com/india/send-money) 
[[@gepser](https://github.com/gepser) & 
[@omarvides](https://github.com/omarvides)]
+1. [Yahoo!](https://www.yahoo.com/)
 1. [Zapier](https://www.zapier.com) [[@drknexus](https://github.com/drknexus) 
& [@statwonk](https://github.com/statwonk)]
 1. [Zendesk](https://www.github.com/zendesk)
 1. [Zenly](https://zen.ly) [[@cerisier](https://github.com/cerisier) & 
[@jbdalido](https://github.com/jbdalido)]



incubator-airflow git commit: [AIRFLOW-1349] Fix backfill to respect limits

2017-08-04 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 651e6063d -> ddc502694


[AIRFLOW-1349] Fix backfill to respect limits

Before, if a backfill job was triggered that would
include a dag run
already in a RUNNING state, the dag run within the
backfill would be
included in the count agains the max_active_runs
limit. Also, if a
backfill job generated multiple dag runs it could
potentially
violate max_active_runs limits by executing all
dag runs.

Now the limit is checked per dag run to be
created, and the backfill job
will only run the dag runs within the backfill job
that could be
included within the limits.
Also, if the max_active_runs limit has already
been reached, the
BackfillJob will wait and loop trying to create
the required dag runs as
soon as a dag run slot within the limit is
available until all dag runs
are completed.

These changes provide a more consistent behavior
according to the
max_active_runs limits definition and allows the
user to run backfill
jobs with existing RUNNING state when already
considered within the
limits.

Closes #2454 from edgarRd/erod-fix-backfill-max


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ddc50269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ddc50269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ddc50269

Branch: refs/heads/master
Commit: ddc50269431d8715e0eaeed7be5f522fed5521da
Parents: 651e606
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Fri Aug 4 14:08:17 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Aug 4 14:08:20 2017 -0700

--
 airflow/bin/cli.py |  13 -
 airflow/jobs.py| 144 +---
 airflow/models.py  |  51 -
 tests/jobs.py  | 140 +-
 4 files changed, 271 insertions(+), 77 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index dc49bb7..077cb90 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -179,7 +179,8 @@ def backfill(args, dag=None):
   conf.getboolean('core', 'donot_pickle')),
 ignore_first_depends_on_past=args.ignore_first_depends_on_past,
 ignore_task_deps=args.ignore_dependencies,
-pool=args.pool)
+pool=args.pool,
+delay_on_limit_secs=args.delay_on_limit)
 
 
 def trigger_dag(args):
@@ -1234,6 +1235,14 @@ class CLIFactory(object):
 "DO respect depends_on_past)."),
 "store_true"),
 'pool': Arg(("--pool",), "Resource pool to use"),
+'delay_on_limit': Arg(
+("--delay_on_limit",),
+help=("Amount of time in seconds to wait when the limit "
+  "on maximum active dag runs (max_active_runs) has "
+  "been reached before trying to execute a dag run "
+  "again."),
+type=float,
+default=1.0),
 # list_tasks
 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
 # list_dags
@@ -1491,7 +1500,7 @@ class CLIFactory(object):
 'dag_id', 'task_regex', 'start_date', 'end_date',
 'mark_success', 'local', 'donot_pickle', 'include_adhoc',
 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
-'subdir', 'pool', 'dry_run')
+'subdir', 'pool', 'delay_on_limit', 'dry_run')
 }, {
 'func': list_tasks,
 'help': "List the tasks within a DAG",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 668973e..d94a0e0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1837,26 +1837,29 @@ class BackfillJob(BaseJob):
  not_ready=None,
  deadlocked=None,
  active_runs=None,
+ executed_dag_run_dates=None,
  finished_runs=0,
  total_runs=0,
  ):
 """
 :param to_run: Tasks to run in the backfill
-:type to_run: dict
+:type to_run: dict[Tuple[String, String, DateTime], TaskInstance]
 :param started: Maps started task instance key to task instance 
object

[jira] [Commented] (AIRFLOW-774) dagbag_size/collect_dags/dagbag_import_errors stats incorrect

2017-08-02 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111825#comment-16111825
 ] 

Dan Davydov commented on AIRFLOW-774:
-

Yes to master, IIRC they should appear starting from release 1.8.1.

This is the JIRA:
https://issues.apache.org/jira/browse/AIRFLOW-780

Yep only fixed the import errors, not the stats. The changes should be similar 
though (pull the stats logic out to the top-level which aggregates each 
individual parsing subprocesses errors).

> dagbag_size/collect_dags/dagbag_import_errors stats incorrect
> -
>
> Key: AIRFLOW-774
> URL: https://issues.apache.org/jira/browse/AIRFLOW-774
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>    Reporter: Dan Davydov
>
> After the multiprocessor change was made (dag folders are processed in 
> parallel), the number of dags reported by airflow is for each of these 
> subprocesses which is inaccurate, and potentially orders of magnitude less 
> than the actual number of dags. These individual processes stats should be 
> aggregated. The collect_dags/dagbag_import_errors stats should also be fixed 
> (time it takes to parse the dags).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1445] Changing HivePartitionSensor UI color to lighter shade

2017-08-01 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1932ccc88 -> 836f2899c


[AIRFLOW-1445] Changing HivePartitionSensor UI color to lighter shade

My PR is simply to improve the readability of the
text using the HivePartitionSensor. The screen
shots below show the before and after. The darker
shade (nearly black) is the before, and the purple
color is the after.

Closes #2476 from Acehaidrey/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/836f2899
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/836f2899
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/836f2899

Branch: refs/heads/master
Commit: 836f2899c6fd225fa8da5b7c697a470f2d2f9d58
Parents: 1932ccc
Author: Ace Haidrey <ahaid...@pandora.com>
Authored: Tue Aug 1 09:20:36 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Aug 1 09:20:39 2017 -0700

--
 airflow/operators/sensors.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/836f2899/airflow/operators/sensors.py
--
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index bfa2ef4..409c18d 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -347,7 +347,7 @@ class HivePartitionSensor(BaseSensorOperator):
 :type metastore_conn_id: str
 """
 template_fields = ('schema', 'table', 'partition',)
-ui_color = '#2b2d42'
+ui_color = '#C5CAE9'
 
 @apply_defaults
 def __init__(



incubator-airflow git commit: [AIRFLOW-1349] Refactor BackfillJob _execute

2017-07-28 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 322ec9609 -> 547f8184b


[AIRFLOW-1349] Refactor BackfillJob _execute

BackfillJob._execute is doing multiple things - it is pretty hard to
follow and maintain.

Changes included are just a re-org of the code, no logic has been
changed.

Refactor includes:
- Break BackfillJob._execute into functions
- Add a Status object to track BackfillJob
internal status while
  executing the job.

Closes #2463 from edgarRd/erod-backfill-refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/547f8184
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/547f8184
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/547f8184

Branch: refs/heads/master
Commit: 547f8184be1e0b3f902faf354fa39579d2f08af2
Parents: 322ec96
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Fri Jul 28 15:49:50 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Jul 28 15:50:24 2017 -0700

--
 airflow/jobs.py   | 515 -
 airflow/models.py |  31 +++
 tests/jobs.py | 189 +-
 3 files changed, 501 insertions(+), 234 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/547f8184/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index e2f8c94..668973e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1817,6 +1817,62 @@ class BackfillJob(BaseJob):
 'polymorphic_identity': 'BackfillJob'
 }
 
+class _DagRunTaskStatus(object):
+"""
+Internal status of the backfill job. This class is intended to be 
instantiated
+only within a BackfillJob instance and will track the execution of 
tasks,
+e.g. started, skipped, succeeded, failed, etc. Information about the 
dag runs
+related to the backfill job are also being tracked in this structure,
+.e.g finished runs, etc. Any other status related information related 
to the
+execution of dag runs / tasks can be included in this structure since 
it makes
+it easier to pass it around.
+"""
+# TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts
+def __init__(self,
+ to_run=None,
+ started=None,
+ skipped=None,
+ succeeded=None,
+ failed=None,
+ not_ready=None,
+ deadlocked=None,
+ active_runs=None,
+ finished_runs=0,
+ total_runs=0,
+ ):
+"""
+:param to_run: Tasks to run in the backfill
+:type to_run: dict
+:param started: Maps started task instance key to task instance 
object
+:type started: dict
+:param skipped: Tasks that have been skipped
+:type skipped: set
+:param succeeded: Tasks that have succeeded so far
+:type succeeded: set
+:param failed: Tasks that have failed
+:type failed: set
+:param not_ready: Tasks not ready for execution
+:type not_ready: set
+:param deadlocked: Deadlocked tasks
+:type deadlocked: set
+:param active_runs: Active tasks at a certain point in time
+:type active_runs: list
+:param finished_runs: Number of finished runs so far
+:type finished_runs: int
+:param total_runs: Number of total dag runs able to run
+:type total_runs: int
+"""
+self.to_run = to_run or dict()
+self.started = started or dict()
+self.skipped = skipped or set()
+self.succeeded = succeeded or set()
+self.failed = failed or set()
+self.not_ready = not_ready or set()
+self.deadlocked = deadlocked or set()
+self.active_runs = active_runs or list()
+self.finished_runs = finished_runs
+self.total_runs = total_runs
+
 def __init__(
 self,
 dag,
@@ -1841,41 +1897,38 @@ class BackfillJob(BaseJob):
 self.pool = pool
 super(BackfillJob, self).__init__(*args, **kwargs)
 
-def _update_counters(self, started, succeeded, skipped, failed, 
tasks_to_run):
+def _update_counters(self, ti_status):
 """
 Updates the counters per state of the tasks that were running. Can 
re-add
 to tasks to run in case required.
-

[jira] [Created] (AIRFLOW-1448) Revert PR 2433 which added merge conflicts to master

2017-07-24 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1448:


 Summary: Revert PR 2433 which added merge conflicts to master
 Key: AIRFLOW-1448
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1448
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov
Assignee: Dan Davydov


https://github.com/apache/incubator-airflow/pull/2433 has logical merge 
conflicts in master and causes tests to fail, it needs to be reverted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1442] Remove extra space from ignore_all_deps generated command

2017-07-21 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3547cbffd -> aa64f370b


[AIRFLOW-1442] Remove extra space from ignore_all_deps generated command

Fix extra whitespace in the ignore_all_deps arg
which was causing commands to fail.

Closes #2468 from aoen/ddavydov--
fix_ignore_all_deps_extra_space


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/aa64f370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/aa64f370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/aa64f370

Branch: refs/heads/master
Commit: aa64f370b28935a9fe0e692864a94fca89113bfe
Parents: 3547cbf
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Fri Jul 21 14:06:29 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Jul 21 14:06:32 2017 -0700

--
 airflow/models.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa64f370/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index c1fd4a3..d1f8e59 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -934,7 +934,7 @@ class TaskInstance(Base):
 cmd.extend(["--mark_success"]) if mark_success else None
 cmd.extend(["--pickle", str(pickle_id)]) if pickle_id else None
 cmd.extend(["--job_id", str(job_id)]) if job_id else None
-cmd.extend(["-A "]) if ignore_all_deps else None
+cmd.extend(["-A"]) if ignore_all_deps else None
 cmd.extend(["-i"]) if ignore_task_deps else None
 cmd.extend(["-I"]) if ignore_depends_on_past else None
 cmd.extend(["--force"]) if ignore_ti_state else None



[jira] [Created] (AIRFLOW-1442) Airflow Ignore All Deps Command Generation Has an Extra Space

2017-07-21 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1442:


 Summary: Airflow Ignore All Deps Command Generation Has an Extra 
Space
 Key: AIRFLOW-1442
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1442
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dan Davydov
Assignee: Dan Davydov


Airflow Ignore All Deps Command Generation Has an Extra Space. This causes 
airflow commands e.g. generated by the webserver to fail.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [Airflow 1332] Split logs based on try number

2017-07-20 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b9576d57b -> b49986c3b


[Airflow 1332] Split logs based on try number

This PR splits logs based on try number and add
tabs to display different task instance tries.

**Note this PR is a temporary change for
separating task attempts. The code in this PR will
be refactored in the future. Please refer to #2422
for Airflow logging abstractions redesign.**

Testing:
1. Added unit tests.
2. Tested on localhost.
3. Tested on production environment with S3 remote
storage, MySQL database, Redis, one Airflow
scheduler and two airflow workers.

Closes #2383 from AllisonWang/allison--add-task-
attempt


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b49986c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b49986c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b49986c3

Branch: refs/heads/master
Commit: b49986c3b24a5382f817d5a3fc40add87b464ba2
Parents: b9576d5
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Thu Jul 20 18:08:15 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Jul 20 18:08:18 2017 -0700

--
 airflow/bin/cli.py|  81 +--
 airflow/models.py |  50 ---
 airflow/utils/logging.py  |  66 +
 airflow/www/templates/airflow/ti_log.html |  40 ++
 airflow/www/views.py  | 180 +
 dags/test_dag.py  |   4 +-
 docs/scheduler.rst|  19 ++-
 tests/models.py   | 137 ++-
 tests/operators/python_operator.py|  88 +++-
 tests/utils/test_dates.py |   4 -
 tests/utils/test_logging.py   |  29 
 11 files changed, 503 insertions(+), 195 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f568d5d..a8543d3 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -59,7 +59,6 @@ from airflow.www.app import cached_app
 from sqlalchemy import func
 from sqlalchemy.orm import exc
 
-
 api.load_auth()
 api_module = import_module(conf.get('cli', 'api_client'))
 api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
@@ -316,7 +315,7 @@ def run(args, dag=None):
 # Load custom airflow config
 if args.cfg_path:
 with open(args.cfg_path, 'r') as conf_file:
-   conf_dict = json.load(conf_file)
+conf_dict = json.load(conf_file)
 
 if os.path.exists(args.cfg_path):
 os.remove(args.cfg_path)
@@ -327,6 +326,21 @@ def run(args, dag=None):
 settings.configure_vars()
 settings.configure_orm()
 
+if not args.pickle and not dag:
+dag = get_dag(args)
+elif not dag:
+session = settings.Session()
+logging.info('Loading pickle id {args.pickle}'.format(args=args))
+dag_pickle = session.query(
+DagPickle).filter(DagPickle.id == args.pickle).first()
+if not dag_pickle:
+raise AirflowException("Who hid the pickle!? [missing pickle]")
+dag = dag_pickle.pickle
+
+task = dag.get_task(task_id=args.task_id)
+ti = TaskInstance(task, args.execution_date)
+ti.refresh_from_db()
+
 logging.root.handlers = []
 if args.raw:
 # Output to STDOUT for the parent process to read and log
@@ -350,19 +364,23 @@ def run(args, dag=None):
 # writable by both users, then it's possible that re-running a task
 # via the UI (or vice versa) results in a permission error as the task
 # tries to write to a log file created by the other user.
+try_number = ti.try_number
 log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
-directory = log_base + 
"/{args.dag_id}/{args.task_id}".format(args=args)
+log_relative_dir = logging_utils.get_log_directory(args.dag_id, 
args.task_id,
+   args.execution_date)
+directory = os.path.join(log_base, log_relative_dir)
 # Create the log file and give it group writable permissions
 # TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
 # operator is not compatible with impersonation (e.g. if a Celery 
executor is used
 # for a SubDag operator and the SubDag operator has a different owner 
than the
 # parent DAG)
-if not os.path.exists(directory):
+if not os.path.isdir(directory):

incubator-airflow git commit: Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"

2017-07-20 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e6ef06c53 -> b9576d57b


Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"

This reverts commit e6ef06c53fd4449db6e665cce5cad9418dde232f which
was committed accidentally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9576d57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9576d57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9576d57

Branch: refs/heads/master
Commit: b9576d57b6063908e488654f0b21b338c10069fd
Parents: e6ef06c
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Thu Jul 20 18:07:17 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Jul 20 18:07:17 2017 -0700

--
 airflow/bin/cli.py  | 111 ++---
 airflow/config_templates/__init__.py|  13 --
 airflow/config_templates/default_airflow.cfg|   6 -
 .../config_templates/default_airflow_logging.py |  73 -
 airflow/settings.py |  13 +-
 airflow/task_runner/base_task_runner.py |   1 -
 airflow/utils/log/__init__.py   |  13 --
 airflow/utils/log/file_task_handler.py  | 158 ---
 airflow/utils/log/s3_task_handler.py|  90 ---
 airflow/utils/logging.py|  11 --
 airflow/www/views.py| 104 +---
 11 files changed, 177 insertions(+), 416 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 11f415a..f568d5d 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,6 +22,7 @@ import os
 import socket
 import subprocess
 import textwrap
+import warnings
 from importlib import import_module
 
 import argparse
@@ -51,6 +52,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 Connection)
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
+from airflow.utils import logging as logging_utils
+from airflow.utils.file import mkdirs
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -324,6 +327,55 @@ def run(args, dag=None):
 settings.configure_vars()
 settings.configure_orm()
 
+logging.root.handlers = []
+if args.raw:
+# Output to STDOUT for the parent process to read and log
+logging.basicConfig(
+stream=sys.stdout,
+level=settings.LOGGING_LEVEL,
+format=settings.LOG_FORMAT)
+else:
+# Setting up logging to a file.
+
+# To handle log writing when tasks are impersonated, the log files 
need to
+# be writable by the user that runs the Airflow command and the user
+# that is impersonated. This is mainly to handle corner cases with the
+# SubDagOperator. When the SubDagOperator is run, all of the operators
+# run under the impersonated user and create appropriate log files
+# as the impersonated user. However, if the user manually runs tasks
+# of the SubDagOperator through the UI, then the log files are created
+# by the user that runs the Airflow command. For example, the Airflow
+# run command may be run by the `airflow_sudoable` user, but the 
Airflow
+# tasks may be run by the `airflow` user. If the log files are not
+# writable by both users, then it's possible that re-running a task
+# via the UI (or vice versa) results in a permission error as the task
+# tries to write to a log file created by the other user.
+log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
+directory = log_base + 
"/{args.dag_id}/{args.task_id}".format(args=args)
+# Create the log file and give it group writable permissions
+# TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
+# operator is not compatible with impersonation (e.g. if a Celery 
executor is used
+# for a SubDag operator and the SubDag operator has a different owner 
than the
+# parent DAG)
+if not os.path.exists(directory):
+# Create the directory as globally writable using custom mkdirs
+# as os.makedirs doesn't set mode properly.
+mkdirs(directory, 0o775)
+iso = args.execution_date.isoformat()
+filename = "{directory}/{iso}".format(**locals())
+
+if not os.path.exists(filename):
+open(filename, "

incubator-airflow git commit: [AIRFLOW-1385] Create abstraction for Airflow task logging

2017-07-20 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 392772326 -> e6ef06c53


[AIRFLOW-1385] Create abstraction for Airflow task logging

This PR adds abilities to provide customized
implementations of airflow task logging. It
creates an abstraction for setting up, cleaning up
and get task instance logs.

This change is primarily a refactor of logging
logic. It is tested locally with custom logging
implementations.

Closes #2422 from AllisonWang/allison--log-handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e6ef06c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e6ef06c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e6ef06c5

Branch: refs/heads/master
Commit: e6ef06c53fd4449db6e665cce5cad9418dde232f
Parents: 3927723
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Thu Jul 20 18:03:23 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Jul 20 18:03:26 2017 -0700

--
 airflow/bin/cli.py  | 111 +++--
 airflow/config_templates/__init__.py|  13 ++
 airflow/config_templates/default_airflow.cfg|   6 +
 .../config_templates/default_airflow_logging.py |  73 +
 airflow/settings.py |  13 +-
 airflow/task_runner/base_task_runner.py |   1 +
 airflow/utils/log/__init__.py   |  13 ++
 airflow/utils/log/file_task_handler.py  | 158 +++
 airflow/utils/log/s3_task_handler.py|  90 +++
 airflow/utils/logging.py|  11 ++
 airflow/www/views.py| 104 +++-
 11 files changed, 416 insertions(+), 177 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f568d5d..11f415a 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,7 +22,6 @@ import os
 import socket
 import subprocess
 import textwrap
-import warnings
 from importlib import import_module
 
 import argparse
@@ -52,8 +51,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 Connection)
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils import logging as logging_utils
-from airflow.utils.file import mkdirs
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -327,55 +324,6 @@ def run(args, dag=None):
 settings.configure_vars()
 settings.configure_orm()
 
-logging.root.handlers = []
-if args.raw:
-# Output to STDOUT for the parent process to read and log
-logging.basicConfig(
-stream=sys.stdout,
-level=settings.LOGGING_LEVEL,
-format=settings.LOG_FORMAT)
-else:
-# Setting up logging to a file.
-
-# To handle log writing when tasks are impersonated, the log files 
need to
-# be writable by the user that runs the Airflow command and the user
-# that is impersonated. This is mainly to handle corner cases with the
-# SubDagOperator. When the SubDagOperator is run, all of the operators
-# run under the impersonated user and create appropriate log files
-# as the impersonated user. However, if the user manually runs tasks
-# of the SubDagOperator through the UI, then the log files are created
-# by the user that runs the Airflow command. For example, the Airflow
-# run command may be run by the `airflow_sudoable` user, but the 
Airflow
-# tasks may be run by the `airflow` user. If the log files are not
-# writable by both users, then it's possible that re-running a task
-# via the UI (or vice versa) results in a permission error as the task
-# tries to write to a log file created by the other user.
-log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
-directory = log_base + 
"/{args.dag_id}/{args.task_id}".format(args=args)
-# Create the log file and give it group writable permissions
-# TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
-# operator is not compatible with impersonation (e.g. if a Celery 
executor is used
-# for a SubDag operator and the SubDag operator has a different owner 
than the
-# parent DAG)
-if not os.path.exists(directory):
-# Create the directory as globally writable using custom mkdirs
-# as os.makedirs doesn't set mode properly.
-mkdirs(direc

[jira] [Created] (AIRFLOW-1432) NVD3 Charts do not have labeled axes and units change dynamically

2017-07-19 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1432:


 Summary: NVD3 Charts do not have labeled axes and units change 
dynamically
 Key: AIRFLOW-1432
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1432
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov


E.g. for the landing times chart, the y axis isn't labeled (e.g. 
minutes/hours/days), and changes dynamically based on the data points in the 
chart/the pixel height of the chart. The y axis should be labeled in these 
charts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1247] Fix ignore all dependencies argument ignored

2017-07-13 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4322d6dae -> e88ecff6a


[AIRFLOW-1247] Fix ignore all dependencies argument ignored

Fix typo in ignore_all_dependencies argument to fix it.

Closes #2441 from aoen/ddavydov--
fix_ignore_all_deps_cli


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e88ecff6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e88ecff6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e88ecff6

Branch: refs/heads/master
Commit: e88ecff6ac71758d763dd5037b177e7a2fbc9896
Parents: 4322d6d
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Thu Jul 13 15:38:00 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Jul 13 15:38:03 2017 -0700

--
 airflow/bin/cli.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e88ecff6/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 4b3a0ed..f568d5d 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1307,7 +1307,7 @@ class CLIFactory(object):
 'ignore_all_dependencies': Arg(
 ("-A", "--ignore_all_dependencies"),
 "Ignores all non-critical dependencies, including ignore_ti_state 
and "
-"ignore_task_deps"
+"ignore_task_deps",
 "store_true"),
 # TODO(aoen): ignore_dependencies is a poor choice of name here 
because it is too
 # vague (e.g. a task being in the appropriate state to be run is also 
a dependency



[jira] [Assigned] (AIRFLOW-1247) CLI: ignore all dependencies argument ignored

2017-07-13 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov reassigned AIRFLOW-1247:


Assignee: Dan Davydov  (was: Matti Remes)

> CLI: ignore all dependencies argument ignored
> -
>
> Key: AIRFLOW-1247
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1247
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Affects Versions: 1.8.1
>Reporter: Matti Remes
>Assignee: Dan Davydov
>Priority: Trivial
> Fix For: 1.9.0
>
>
> Missing comma in Arg object creation



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1366] Add max_tries to task instance

2017-07-10 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b532d8d77 -> 4f20f6077


[AIRFLOW-1366] Add max_tries to task instance

Right now Airflow deletes the task instance when
user clear it. We have no way of keeping track of
how many times a task instance gets run either via
user or itself. So instead of deleting the task
instance record, we should keep the task instance
and make try_number monotonically increasing for
every task instance attempt. max_tries is
introduced as an upper bound for retrying tasks by
task itself.

This new column will be used to update logic
behind clear_task_instances.

db migration is tested locally.

Closes #2409 from AllisonWang/allison--max-tries


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4f20f607
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4f20f607
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4f20f607

Branch: refs/heads/master
Commit: 4f20f607764bb3477419321b5dfd0c53ba1db3c0
Parents: b532d8d
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Mon Jul 10 15:26:08 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Jul 10 15:26:12 2017 -0700

--
 ...dc7_add_max_tries_column_to_task_instance.py | 106 +++
 airflow/models.py   |   3 +
 2 files changed, 109 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f20f607/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
--
diff --git 
a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
 
b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
new file mode 100644
index 000..2d5ffc2
--- /dev/null
+++ 
b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
@@ -0,0 +1,106 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add max tries column to task instance
+
+Revision ID: cc1e65623dc7
+Revises: 127d2bf2dfa7
+Create Date: 2017-06-19 16:53:12.851141
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'cc1e65623dc7'
+down_revision = '127d2bf2dfa7'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+from airflow import settings
+from airflow.models import DagBag, TaskInstance
+
+BATCH_SIZE = 5000
+
+def upgrade():
+op.add_column('task_instance', sa.Column('max_tries', sa.Integer,
+server_default="-1"))
+# Check if table task_instance exist before data migration. This check is
+# needed for database that does not create table until migration finishes.
+# Checking task_instance table exists prevent the error of querying
+# non-existing task_instance table.
+engine = settings.engine
+if engine.dialect.has_table(engine, 'task_instance'):
+# Get current session
+connection = op.get_bind()
+sessionmaker = sa.orm.sessionmaker()
+session = sessionmaker(bind=connection)
+dagbag = DagBag(settings.DAGS_FOLDER)
+query = session.query(sa.func.count(TaskInstance.max_tries)).filter(
+TaskInstance.max_tries == -1
+)
+# Separate db query in batch to prevent loading entire table
+# into memory and cause out of memory error.
+while query.scalar():
+tis = session.query(TaskInstance).filter(
+TaskInstance.max_tries == -1
+).limit(BATCH_SIZE).all()
+for ti in tis:
+dag = dagbag.get_dag(ti.dag_id)
+if not dag or not dag.has_task(ti.task_id):
+# task_instance table might not have the up-to-date
+# information, i.e dag or task might be modified or
+# deleted in dagbag but is reflected in task instance
+# table. In this case we do not retry the task that can't
+# be parsed.
+ti.max_tries = ti.try_number
+else:
+task = dag.get_task(ti.task_id)
+ti.max_tries = 

[jira] [Created] (AIRFLOW-1351) airflow cli commands should log error/debug to stderr

2017-06-27 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1351:


 Summary: airflow cli commands should log error/debug to stderr
 Key: AIRFLOW-1351
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1351
 Project: Apache Airflow
  Issue Type: Bug
  Components: cli
Reporter: Dan Davydov


Commands like airflow list_task are printing messages like:
{code}[2017-06-27 20:36:57,116] {models.py:373} DEBUG - Loaded DAG {code}
to stdout when the only thing being printed to stdout should be the listed dag 
ids.

This makes it hard for users to parse the output of these commands.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1311) Improve Webserver Load Time For Large DAGs

2017-06-15 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1311:


 Summary: Improve Webserver Load Time For Large DAGs
 Key: AIRFLOW-1311
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1311
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov


Large DAGs can take an extremely long time to load in the Airflow UI 
(minutes/timeout).

The fixes are as follows:
1. Lazy load DAGs (load up to a certain # of tasks by default, prioritizing 
tasks by their depth, and allow users to expand sections for these DAGs, 
ideally prefetch deeper tasks once the initial set of tasks has rendered )
2. Identify bottlenecks/performance issues in both the frontend/backend for 
rendering DAGs on the webserver and fix them. Airflow should be more performant 
for displaying DAGs that are somewhat large, e.g. DAGs that have up to 500 
nodes and 2000 edges (dependencies from one task to another) should render 
within a couple of seconds.
3. Make DAG loading asynchronous in the UI (once the top-level tasks have 
loaded display them immediately). We might not want to do this as users might 
try to click something only to have the UI change from underneath them

[~saguziel]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-936] Add clear/mark success for DAG in the UI

2017-06-13 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6e3bcd318 -> 3c450fbe1


[AIRFLOW-936] Add clear/mark success for DAG in the UI

This PR adds a modal popup when clicking circle
DAG icon in Airflow tree view UI. It adds the
functionalities to clear/mark success of the
entire DAG run. This behavior is equivalent to
individually clear/mark each task instance in the
DAG run. The original logic of editing DAG run
page is moved to the button "Edit DAG Run".

Closes #2339 from AllisonWang/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c450fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c450fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c450fbe

Branch: refs/heads/master
Commit: 3c450fbe1abad7b76b59b6b3b15c6e29b4ad8d0f
Parents: 6e3bcd3
Author: Allison Wang <allisonwang...@gmail.com>
Authored: Tue Jun 13 18:56:41 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Jun 13 18:56:44 2017 -0700

--
 airflow/api/common/experimental/mark_tasks.py |  33 +++-
 airflow/www/templates/airflow/dag.html|  60 +++
 airflow/www/templates/airflow/tree.html   |  11 +-
 airflow/www/views.py  | 113 +---
 tests/api/common/mark_tasks.py| 189 -
 5 files changed, 372 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/api/common/experimental/mark_tasks.py
--
diff --git a/airflow/api/common/experimental/mark_tasks.py 
b/airflow/api/common/experimental/mark_tasks.py
index 0ddbf98..82eb4b5 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -22,7 +22,6 @@ from airflow.utils.state import State
 
 from sqlalchemy import or_
 
-
 def _create_dagruns(dag, execution_dates, state, run_id_template):
 """
 Infers from the dates which dag runs need to be created and does so.
@@ -181,7 +180,39 @@ def set_state(task, execution_date, upstream=False, 
downstream=False,
 if len(sub_dag_ids) > 0:
 tis_altered += qry_sub_dag.all()
 
+session.expunge_all()
 session.close()
 
 return tis_altered
 
+def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
+"""
+Set the state of a dag run and all task instances associated with the dag
+run for a specific execution date.
+:param dag: the DAG of which to alter state
+:param execution_date: the execution date from which to start looking
+:param state: the state to which the DAG need to be set
+:param commit: commit DAG and tasks to be altered to the database
+:return: list of tasks that have been created and updated
+:raises: AssertionError if dag or execution_date is invalid
+"""
+res = []
+
+if not dag or not execution_date:
+return res
+
+# Mark all task instances in the dag run
+for task in dag.tasks:
+task.dag = dag
+new_state = set_state(task=task, execution_date=execution_date,
+  state=state, commit=commit)
+res.extend(new_state)
+
+# Mark the dag run
+if commit:
+drs = DagRun.find(dag.dag_id, execution_date=execution_date)
+for dr in drs:
+dr.dag = dag
+dr.update_state()
+
+return res

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/templates/airflow/dag.html
--
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index e5a305c..706ed32 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -216,6 +216,36 @@
   
 
   
+  
+  
+
+  
+
+  
+
+  
+
+
+  
+Edit
+  
+  
+Clear
+  
+  
+Mark Success
+  
+
+
+  
+Close
+  
+
+  
+
+  
 {% endblock %}
 {% block tail %}
   {{ lib.form_js() }}
@@ -239,6 +269,7 @@ function updateQueryStringParameter(uri, key, value) {
   $('.never_active').removeClass('active');
 });
 
+var id = '';
 var dag_id = '{{ dag.dag_id }}';
 var task_id = '';
 var exection_date = '';
@@ -263,6 +294,14 @@ function updateQueryStringParameter(uri, key, value) {
 }
 }
 
+function call_modal_dag(dag) {
+  id = dag && dag.id;
+  execution_date =

[jira] [Resolved] (AIRFLOW-1024) Handle CeleryExecutor errors gracefully

2017-06-08 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov resolved AIRFLOW-1024.
--
Resolution: Fixed

> Handle CeleryExecutor errors gracefully
> ---
>
> Key: AIRFLOW-1024
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1024
> Project: Apache Airflow
>  Issue Type: Bug
>        Reporter: Dan Davydov
>Priority: Critical
> Fix For: 1.8.1
>
>
> If the Airflow celery executor receives a bad response from a worker (e.g. 
> unpickled response), then it will crash the scheduler and cause it to restart.
> We should code defensively around the interactions with celery so that we 
> just log errors instead of crashing the scheduler.
> It might makes sense to make the try catches one level higher (to catch 
> errors from all executors), but this needs some investigation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1288) Bad owners field in DAGs breaks Airflow front page

2017-06-07 Thread Dan Davydov (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Davydov updated AIRFLOW-1288:
-
Description: 
DAGs that have owners set to a bad value break the front page of the webserver 
with an error like below. Instead these should just cause import errors for the 
specific dags in question.
{code}
Ooops.

  / (  ()   )  \___
 /( (  (  )   _))  )   )\
   (( (   )()  )   (   )  )
 ((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   (((   )  .((_ ) .  )_
   ( (  )(  (  ))   ) . ) (   )
  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
  ( (  (   ) (  )   (  )) ) _)(   )  )  )
 ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
  (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
 ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
  ((  (   )(( _)   _) _(_ (  (_ )
   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
   ((__)\\||lll|l||///  \_))
(   /(/ (  )  ) )\   )
  (( ( ( | | ) ) )\   )
   (   /(| / ( )) ) ) )) )
 ( ( _(|)_) )
  (  ||\(|(|)|/|| )
(|(||(||))
  ( //|/l|||)|\\ \ )
(/ / //  /|//\\  \ \  \ _)
---
Node: i-0dbbddfb63fb2cfbc.inst.aws.airbnb.com
---
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in 
wsgi_app
response = self.full_dispatch_request()
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in 
full_dispatch_request
rv = self.handle_user_exception(e)
  File 
"/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", 
line 103, in _nr_wrapper_Flask_handle_exception_
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in 
handle_user_exception
reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in 
full_dispatch_request
rv = self.dispatch_request()
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in 
dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File 
"/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", 
line 40, in _nr_wrapper_handler_
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, 
in inner
return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 367, 
in _run_view
return fn(self, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 758, in 
decorated_view
return func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 
1909, in index
all_dag_ids=all_dag_ids)
  File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 307, 
in render
return render_template(template, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", 
line 110, in literal_wrapper
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 128, 
in render_template
context, ctx.app)
  File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 110, 
in _render
rv = template.render(context)
  File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", 
line 98, in dynamic_wrapper
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 
989, in render
return self.environment.handle_exception(exc_info, True)
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 
754, in handle_exception
reraise(exc_type, exc_value, tb)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/www/templates/airflow/dags.html",
 line 18, in top-level template code
{% extends "airflow/master.html" %}
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/www/templates/airflow/master.html",
 line 18, in top-level template code
{% extends "admin/master.html" %}
  File 
"/usr/local/lib/

[jira] [Created] (AIRFLOW-1288) Bad owners field in DAGs breaks Airflow front page

2017-06-07 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1288:


 Summary: Bad owners field in DAGs breaks Airflow front page
 Key: AIRFLOW-1288
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1288
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov


DAGs that have owners set to a bad value break the front page of the webserver 
with an error like below. Instead these should just cause import errors for the 
specific dags in question.
{{code}}
Ooops.

  / (  ()   )  \___
 /( (  (  )   _))  )   )\
   (( (   )()  )   (   )  )
 ((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   (((   )  .((_ ) .  )_
   ( (  )(  (  ))   ) . ) (   )
  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
  ( (  (   ) (  )   (  )) ) _)(   )  )  )
 ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
  (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
 ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
  ((  (   )(( _)   _) _(_ (  (_ )
   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
   ((__)\\||lll|l||///  \_))
(   /(/ (  )  ) )\   )
  (( ( ( | | ) ) )\   )
   (   /(| / ( )) ) ) )) )
 ( ( _(|)_) )
  (  ||\(|(|)|/|| )
(|(||(||))
  ( //|/l|||)|\\ \ )
(/ / //  /|//\\  \ \  \ _)
---
Node: i-0dbbddfb63fb2cfbc.inst.aws.airbnb.com
---
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in 
wsgi_app
response = self.full_dispatch_request()
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in 
full_dispatch_request
rv = self.handle_user_exception(e)
  File 
"/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", 
line 103, in _nr_wrapper_Flask_handle_exception_
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in 
handle_user_exception
reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in 
full_dispatch_request
rv = self.dispatch_request()
  File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in 
dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File 
"/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", 
line 40, in _nr_wrapper_handler_
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, 
in inner
return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 367, 
in _run_view
return fn(self, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 758, in 
decorated_view
return func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 
1909, in index
all_dag_ids=all_dag_ids)
  File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 307, 
in render
return render_template(template, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", 
line 110, in literal_wrapper
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 128, 
in render_template
context, ctx.app)
  File "/usr/local/lib/python2.7/dist-packages/flask/templating.py", line 110, 
in _render
rv = template.render(context)
  File "/usr/local/lib/python2.7/dist-packages/newrelic/api/function_trace.py", 
line 98, in dynamic_wrapper
return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 
989, in render
return self.environment.handle_exception(exc_info, True)
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 
754, in handle_exception
reraise(exc_type, exc_value, tb)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/www/templates/airflow/dags.html",
 line 18, in top-level template code
{% extends "airflow/master.html" %}
  File 
"/usr/local/lib/python2.7/dis

[jira] [Created] (AIRFLOW-1280) Gantt Chart Height Isn't Set Properly

2017-06-05 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-1280:


 Summary: Gantt Chart Height Isn't Set Properly
 Key: AIRFLOW-1280
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1280
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov


The Gantt view has a dynamic height for the chart body based on the number of 
tasks but it's only a heuristic. Instead we should calculate the height of the 
chart legend and make the height of the chart equal to the height of the legend 
+ desired height for the chart body.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1263] Dynamic height for charts

2017-06-05 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 53ad99106 -> e3e6aa719


[AIRFLOW-1263] Dynamic height for charts

Dynamic heights for webserver charts so that
longer task
names fit

Closes #2344 from aoen/ddavydov--
dynamic_chart_heights


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e3e6aa71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e3e6aa71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e3e6aa71

Branch: refs/heads/master
Commit: e3e6aa71984f5fe2e9c36d1f177054b4457f5b34
Parents: 53ad991
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Mon Jun 5 16:32:42 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Jun 5 16:32:46 2017 -0700

--
 airflow/www/views.py | 25 -
 1 file changed, 20 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e3e6aa71/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e250111..4fd52fe 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -284,6 +284,16 @@ def should_hide_value_for_key(key_name):
and conf.getboolean('admin', 'hide_sensitive_variable_fields')
 
 
+
+def get_chart_height(dag):
+"""
+TODO(aoen): See [AIRFLOW-1263] We use the number of tasks in the DAG as a 
heuristic to
+approximate the size of generated chart (otherwise the charts are tiny and 
unreadable
+when DAGs have a large number of tasks). Ideally nvd3 should allow for 
dynamic-height
+charts, that is charts that take up space based on the size of the 
components within.
+"""
+return 600 + len(dag.tasks) * 10
+
 class Airflow(BaseView):
 
 def is_visible(self):
@@ -1411,10 +1421,12 @@ class Airflow(BaseView):
 include_upstream=True,
 include_downstream=False)
 
+
+chart_height = get_chart_height(dag)
 chart = nvd3.lineChart(
-name="lineChart", x_is_date=True, height=600, width="1200")
+name="lineChart", x_is_date=True, height=chart_height, 
width="1200")
 cum_chart = nvd3.lineChart(
-name="cumLineChart", x_is_date=True, height=600, width="1200")
+name="cumLineChart", x_is_date=True, height=chart_height, 
width="1200")
 
 y = defaultdict(list)
 x = defaultdict(list)
@@ -1516,8 +1528,10 @@ class Airflow(BaseView):
 include_upstream=True,
 include_downstream=False)
 
+chart_height = get_chart_height(dag)
 chart = nvd3.lineChart(
-name="lineChart", x_is_date=True, y_axis_format='d', height=600, 
width="1200")
+name="lineChart", x_is_date=True, y_axis_format='d', 
height=chart_height,
+width="1200")
 
 for task in dag.tasks:
 y = []
@@ -1578,8 +1592,9 @@ class Airflow(BaseView):
 include_upstream=True,
 include_downstream=False)
 
+chart_height = get_chart_height(dag)
 chart = nvd3.lineChart(
-name="lineChart", x_is_date=True, height=600, width="1200")
+name="lineChart", x_is_date=True, height=chart_height, 
width="1200")
 y = {}
 x = {}
 for task in dag.tasks:
@@ -1622,7 +1637,7 @@ class Airflow(BaseView):
 'airflow/chart.html',
 dag=dag,
 chart=chart.htmlcontent,
-height="700px",
+height=str(chart_height + 100) + "px",
 demo_mode=conf.getboolean('webserver', 'demo_mode'),
 root=root,
 form=form,



  1   2   3   4   >