[jira] [Commented] (AIRFLOW-2385) Airflow task is not stopped when execution timeout gets triggered

2018-04-26 Thread Arthur Wiedmer (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455717#comment-16455717
 ] 

Arthur Wiedmer commented on AIRFLOW-2385:
-

Hi Yohei,

Unless I am mistaken, it looks like your operator is executing a Spark Job (I 
seem to recognize the progress bar from the logs.). execution_timeout will only 
a raise an exception in the Python process, but it might not kill the job.

You probably want to implement the on_kill method for your operator so that it 
knows how to clean up your process. It has been implemented in a few operators 
already in the code base.

Good luck!

> Airflow task is not stopped when execution timeout gets triggered
> -
>
> Key: AIRFLOW-2385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2385
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.9.0
>Reporter: Yohei Onishi
>Priority: Major
>
> I have my own custom operator extends BaseOperator as follows. I tried to 
> kill a task if the task runs for more than 30 minutes. timeout seems to be 
> triggered according to a log but the task still continued.
> Am I missing something? I checked the official document but do not know what 
> is wrong.[https://airflow.apache.org/code.html#baseoperator]
> My operator is like as follows.
> {code:java}
> class MyOperator(BaseOperator):
>   @apply_defaults
>   def __init__(
> self,
> some_parameters_here,
> *args,
> **kwargs):
> super(MyOperator, self).__init__(*args, **kwargs)
> # some initialization here
>   def execute(self, context):
> # some code here
> {code}
>  
> {{}}My task is like as follows.
> {code:java}
> t = MyOperator(
>   task_id='task',
>   dag=scheduled_dag,
>   execution_timeout=timedelta(minutes=30)
> {code}
>  
> I found this error but the task continued.
> {code:java}
> [2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 
> 6:==(1380 + -160) / 
> 1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2385) Airflow task is not stopped when execution timeout gets triggered

2018-04-26 Thread Yohei Onishi (JIRA)
Yohei Onishi created AIRFLOW-2385:
-

 Summary: Airflow task is not stopped when execution timeout gets 
triggered
 Key: AIRFLOW-2385
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2385
 Project: Apache Airflow
  Issue Type: Bug
  Components: DAG
Affects Versions: 1.9.0
Reporter: Yohei Onishi


I have my own custom operator extends BaseOperator as follows. I tried to kill 
a task if the task runs for more than 30 minutes. timeout seems to be triggered 
according to a log but the task still continued.

Am I missing something? I checked the official document but do not know what is 
wrong.[https://airflow.apache.org/code.html#baseoperator]

My operator is like as follows.
{code:java}
class MyOperator(BaseOperator):
  @apply_defaults
  def __init__(
self,
some_parameters_here,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
# some initialization here

  def execute(self, context):
# some code here
{code}
 

{{}}My task is like as follows.
{code:java}
t = MyOperator(
  task_id='task',
  dag=scheduled_dag,
  execution_timeout=timedelta(minutes=30)
{code}
 

I found this error but the task continued.
{code:java}
[2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 
6:==(1380 + -160) / 
1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-2378) Add Groupon to README

2018-04-26 Thread Siddharth Anand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siddharth Anand resolved AIRFLOW-2378.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

Issue resolved by pull request #3267
[https://github.com/apache/incubator-airflow/pull/3267]

> Add Groupon to README
> -
>
> Key: AIRFLOW-2378
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2378
> Project: Apache Airflow
>  Issue Type: Wish
>Reporter: steven casey
>Assignee: steven casey
>Priority: Trivial
> Fix For: 2.0.0
>
>
> Add Groupon to current list of Airflow users



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2378) Add Groupon to README

2018-04-26 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455645#comment-16455645
 ] 

ASF subversion and git services commented on AIRFLOW-2378:
--

Commit 801fe7dbdcc74988fe937e013d7f019df87c44e5 in incubator-airflow's branch 
refs/heads/master from [~stevencasey]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=801fe7d ]

[AIRFLOW-2378] Add Groupon to list of current users

Closes #3267 from stevencasey/add_groupon


> Add Groupon to README
> -
>
> Key: AIRFLOW-2378
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2378
> Project: Apache Airflow
>  Issue Type: Wish
>Reporter: steven casey
>Assignee: steven casey
>Priority: Trivial
> Fix For: 2.0.0
>
>
> Add Groupon to current list of Airflow users



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2378] Add Groupon to list of current users

2018-04-26 Thread sanand
Repository: incubator-airflow
Updated Branches:
  refs/heads/master ae63246a1 -> 801fe7dbd


[AIRFLOW-2378] Add Groupon to list of current users

Closes #3267 from stevencasey/add_groupon


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/801fe7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/801fe7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/801fe7db

Branch: refs/heads/master
Commit: 801fe7dbdcc74988fe937e013d7f019df87c44e5
Parents: ae63246
Author: steven casey <265229+stevenca...@users.noreply.github.com>
Authored: Thu Apr 26 18:47:28 2018 -0700
Committer: r39132 
Committed: Thu Apr 26 18:47:28 2018 -0700

--
 README.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/801fe7db/README.md
--
diff --git a/README.md b/README.md
index 5e5193c..b504cbd 100644
--- a/README.md
+++ b/README.md
@@ -140,6 +140,7 @@ Currently **officially** using Airflow:
 1. [GovTech GDS](https://gds-gov.tech) 
[[@chrissng](https://github.com/chrissng) & 
[@datagovsg](https://github.com/datagovsg)]
 1. [Grand Rounds](https://www.grandrounds.com/) 
[[@richddr](https://github.com/richddr), 
[@timz1290](https://github.com/timz1290), 
[@wenever](https://github.com/@wenever), & 
[@runongirlrunon](https://github.com/runongirlrunon)]
 1. [Groupalia](http://es.groupalia.com) 
[[@jesusfcr](https://github.com/jesusfcr)]
+1. [Groupon](https://groupon.com) 
[[@stevencasey](https://github.com/stevencasey)]
 1. [Gusto](https://gusto.com) [[@frankhsu](https://github.com/frankhsu)]
 1. [Handshake](https://joinhandshake.com/) 
[[@mhickman](https://github.com/mhickman)]
 1. [Handy](http://www.handy.com/careers/73115?gh_jid=73115_src=o5qcxn) 
[[@marcintustin](https://github.com/marcintustin) / 
[@mtustin-handy](https://github.com/mtustin-handy)]



[jira] [Commented] (AIRFLOW-2382) Fix wrong description for delimiter

2018-04-26 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455642#comment-16455642
 ] 

ASF subversion and git services commented on AIRFLOW-2382:
--

Commit ae63246a1d580d77c366276300139165e4c984de in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ae63246 ]

[AIRFLOW-2382] Fix wrong description for delimiter

Fix misleading descriptions for the 'delimiter'
parameter in S3ListOperator and
S3ToGoogleCloudStorageOperator's docstring.

Closes #3270 from sekikn/AIRFLOW-2382


> Fix wrong description for delimiter
> ---
>
> Key: AIRFLOW-2382
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2382
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws, operators
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
> Fix For: 2.0.0
>
>
> The document for S3ListOperator says:
> {code}
> :param delimiter: The delimiter by which you want to filter the objects.
> For e.g to lists the CSV files from in a directory in S3 you would use
> delimiter='.csv'.
> {code}
> {code}
> **Example**:
> The following operator would list all the CSV files from the S3
> ``customers/2018/04/`` key in the ``data`` bucket. ::
> s3_file = S3ListOperator(
> task_id='list_3s_files',
> bucket='data',
> prefix='customers/2018/04/',
> delimiter='.csv',
> aws_conn_id='aws_customers_conn'
> )
> {code}
> but it actually behaves oppositely:
> {code}
> In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator
> In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3').execute(None)
> [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe']
> In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3', delimiter='.csv').execute(None)
> [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[3]: ['1.txt', '2.jpg', '3.exe']
> {code}
> This is because that the 'delimiter' parameter is for representing path 
> hierarchy (so '/' is used typically), not file extension. Also 
> S3ToGoogleCloudStorageOperator has the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2382) Fix wrong description for delimiter

2018-04-26 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455643#comment-16455643
 ] 

ASF subversion and git services commented on AIRFLOW-2382:
--

Commit ae63246a1d580d77c366276300139165e4c984de in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ae63246 ]

[AIRFLOW-2382] Fix wrong description for delimiter

Fix misleading descriptions for the 'delimiter'
parameter in S3ListOperator and
S3ToGoogleCloudStorageOperator's docstring.

Closes #3270 from sekikn/AIRFLOW-2382


> Fix wrong description for delimiter
> ---
>
> Key: AIRFLOW-2382
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2382
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws, operators
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
> Fix For: 2.0.0
>
>
> The document for S3ListOperator says:
> {code}
> :param delimiter: The delimiter by which you want to filter the objects.
> For e.g to lists the CSV files from in a directory in S3 you would use
> delimiter='.csv'.
> {code}
> {code}
> **Example**:
> The following operator would list all the CSV files from the S3
> ``customers/2018/04/`` key in the ``data`` bucket. ::
> s3_file = S3ListOperator(
> task_id='list_3s_files',
> bucket='data',
> prefix='customers/2018/04/',
> delimiter='.csv',
> aws_conn_id='aws_customers_conn'
> )
> {code}
> but it actually behaves oppositely:
> {code}
> In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator
> In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3').execute(None)
> [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe']
> In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3', delimiter='.csv').execute(None)
> [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[3]: ['1.txt', '2.jpg', '3.exe']
> {code}
> This is because that the 'delimiter' parameter is for representing path 
> hierarchy (so '/' is used typically), not file extension. Also 
> S3ToGoogleCloudStorageOperator has the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-2382) Fix wrong description for delimiter

2018-04-26 Thread Siddharth Anand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siddharth Anand resolved AIRFLOW-2382.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

Issue resolved by pull request #3270
[https://github.com/apache/incubator-airflow/pull/3270]

> Fix wrong description for delimiter
> ---
>
> Key: AIRFLOW-2382
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2382
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws, operators
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
> Fix For: 2.0.0
>
>
> The document for S3ListOperator says:
> {code}
> :param delimiter: The delimiter by which you want to filter the objects.
> For e.g to lists the CSV files from in a directory in S3 you would use
> delimiter='.csv'.
> {code}
> {code}
> **Example**:
> The following operator would list all the CSV files from the S3
> ``customers/2018/04/`` key in the ``data`` bucket. ::
> s3_file = S3ListOperator(
> task_id='list_3s_files',
> bucket='data',
> prefix='customers/2018/04/',
> delimiter='.csv',
> aws_conn_id='aws_customers_conn'
> )
> {code}
> but it actually behaves oppositely:
> {code}
> In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator
> In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3').execute(None)
> [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe']
> In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3', delimiter='.csv').execute(None)
> [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[3]: ['1.txt', '2.jpg', '3.exe']
> {code}
> This is because that the 'delimiter' parameter is for representing path 
> hierarchy (so '/' is used typically), not file extension. Also 
> S3ToGoogleCloudStorageOperator has the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2382] Fix wrong description for delimiter

2018-04-26 Thread sanand
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 36193fc74 -> ae63246a1


[AIRFLOW-2382] Fix wrong description for delimiter

Fix misleading descriptions for the 'delimiter'
parameter in S3ListOperator and
S3ToGoogleCloudStorageOperator's docstring.

Closes #3270 from sekikn/AIRFLOW-2382


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae63246a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae63246a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae63246a

Branch: refs/heads/master
Commit: ae63246a1d580d77c366276300139165e4c984de
Parents: 36193fc
Author: Kengo Seki 
Authored: Thu Apr 26 18:45:12 2018 -0700
Committer: r39132 
Committed: Thu Apr 26 18:45:12 2018 -0700

--
 airflow/contrib/operators/s3_list_operator.py   | 12 +---
 airflow/contrib/operators/s3_to_gcs_operator.py |  6 ++
 2 files changed, 7 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_list_operator.py
--
diff --git a/airflow/contrib/operators/s3_list_operator.py 
b/airflow/contrib/operators/s3_list_operator.py
index 1dcbc60..dbb45fe 100644
--- a/airflow/contrib/operators/s3_list_operator.py
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -24,8 +24,7 @@ from airflow.utils.decorators import apply_defaults
 
 class S3ListOperator(BaseOperator):
 """
-List all objects from the bucket with the given string prefix and delimiter
-in name.
+List all objects from the bucket with the given string prefix in name.
 
 This operator returns a python list with the name of objects which can be
 used by `xcom` in the downstream task.
@@ -35,22 +34,21 @@ class S3ListOperator(BaseOperator):
 :param prefix: Prefix string to filters the objects whose name begin with
 such prefix
 :type prefix: string
-:param delimiter: The delimiter by which you want to filter the objects.
-For e.g to lists the CSV files from in a directory in S3 you would use
-delimiter='.csv'.
+:param delimiter: the delimiter marks key hierarchy.
 :type delimiter: string
 :param aws_conn_id: The connection ID to use when connecting to S3 storage.
 :type aws_conn_id: string
 
 **Example**:
-The following operator would list all the CSV files from the S3
+The following operator would list all the files
+(excluding subfolders) from the S3
 ``customers/2018/04/`` key in the ``data`` bucket. ::
 
 s3_file = S3ListOperator(
 task_id='list_3s_files',
 bucket='data',
 prefix='customers/2018/04/',
-delimiter='.csv',
+delimiter='/',
 aws_conn_id='aws_customers_conn'
 )
 """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_to_gcs_operator.py
--
diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py 
b/airflow/contrib/operators/s3_to_gcs_operator.py
index d105596..5a2004d 100644
--- a/airflow/contrib/operators/s3_to_gcs_operator.py
+++ b/airflow/contrib/operators/s3_to_gcs_operator.py
@@ -37,12 +37,10 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
 :param prefix: Prefix string which filters objects whose name begin with
 such prefix.
 :type prefix: string
-:param delimiter: The delimiter by which you want to filter the objects on.
-E.g. to list CSV files from a S3 key you would do the following,
-`delimiter='.csv'`.
+:param delimiter: the delimiter marks key hierarchy.
 :type delimiter: string
 :param aws_conn_id: The source S3 connection
-:type aws_conn_id: str
+:type aws_conn_id: string
 :param dest_gcs_conn_id: The destination connection ID to use
 when connecting to Google Cloud Storage.
 :type dest_gcs_conn_id: string



[jira] [Updated] (AIRFLOW-2384) Flask 0.12.3+ breaks Airflow webserver

2018-04-26 Thread Kyle Hamlin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Hamlin updated AIRFLOW-2384:
-
Description: 
Flask 0.12.3 and 1.0.0 were released about an hour ago with breaking changes to 
Airflows webserver
{code:java}
File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
135, in handle
self.handle_request(listener, req, client, addr)
File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
176, in handle_request
respiter = self.wsgi(environ, resp.start_response)
File "/usr/local/lib/python3.6/site-packages/werkzeug/wsgi.py", line 826, in 
__call__
return app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1997, in 
__call__
return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1978, in 
wsgi_app
ctx.push()
File "/usr/local/lib/python3.6/site-packages/flask/ctx.py", line 332, in push
self.session = self.app.open_session(self.request)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 912, in 
open_session
return self.session_interface.open_session(self, request)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 324, in 
open_session
s = self.get_signing_serializer(app)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 321, in 
get_signing_serializer
signer_kwargs=signer_kwargs)
File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 519, in 
__init__
self.is_text_serializer = is_text_serializer(serializer)
File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 69, in 
is_text_serializer
return isinstance(serializer.dumps({}), text_type)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 85, in 
dumps
return json.dumps(_tag(value), separators=(',', ':'))
File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 177, 
in dumps
_dump_arg_defaults(kwargs)
File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 98, 
in _dump_arg_defaults
bp.json_encoder if bp and bp.json_encoder
AttributeError: 'Blueprint' object has no attribute 'json_encoder'{code}

  was:
Flask 12.3.0 and 1.0.0 were released about an hour ago with breaking changes to 
Airflows webserver
{code:java}
File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
135, in handle
self.handle_request(listener, req, client, addr)
File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
176, in handle_request
respiter = self.wsgi(environ, resp.start_response)
File "/usr/local/lib/python3.6/site-packages/werkzeug/wsgi.py", line 826, in 
__call__
return app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1997, in 
__call__
return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1978, in 
wsgi_app
ctx.push()
File "/usr/local/lib/python3.6/site-packages/flask/ctx.py", line 332, in push
self.session = self.app.open_session(self.request)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 912, in 
open_session
return self.session_interface.open_session(self, request)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 324, in 
open_session
s = self.get_signing_serializer(app)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 321, in 
get_signing_serializer
signer_kwargs=signer_kwargs)
File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 519, in 
__init__
self.is_text_serializer = is_text_serializer(serializer)
File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 69, in 
is_text_serializer
return isinstance(serializer.dumps({}), text_type)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 85, in 
dumps
return json.dumps(_tag(value), separators=(',', ':'))
File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 177, 
in dumps
_dump_arg_defaults(kwargs)
File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 98, 
in _dump_arg_defaults
bp.json_encoder if bp and bp.json_encoder
AttributeError: 'Blueprint' object has no attribute 'json_encoder'{code}


> Flask 0.12.3+ breaks Airflow webserver
> --
>
> Key: AIRFLOW-2384
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2384
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.10.0
>Reporter: Kyle Hamlin
>Priority: Critical
> Fix For: 1.10.0
>
>
> Flask 0.12.3 and 1.0.0 were released about an hour ago with breaking changes 
> to Airflows webserver
> {code:java}
> File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
> 135, in handle
> self.handle_request(listener, req, 

[jira] [Created] (AIRFLOW-2384) Flask 0.12.3+ breaks Airflow webserver

2018-04-26 Thread Kyle Hamlin (JIRA)
Kyle Hamlin created AIRFLOW-2384:


 Summary: Flask 0.12.3+ breaks Airflow webserver
 Key: AIRFLOW-2384
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2384
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Affects Versions: 1.10.0
Reporter: Kyle Hamlin
 Fix For: 1.10.0


Flask 12.3.0 and 1.0.0 were released about an hour ago with breaking changes to 
Airflows webserver
{code:java}
File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
135, in handle
self.handle_request(listener, req, client, addr)
File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 
176, in handle_request
respiter = self.wsgi(environ, resp.start_response)
File "/usr/local/lib/python3.6/site-packages/werkzeug/wsgi.py", line 826, in 
__call__
return app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1997, in 
__call__
return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1978, in 
wsgi_app
ctx.push()
File "/usr/local/lib/python3.6/site-packages/flask/ctx.py", line 332, in push
self.session = self.app.open_session(self.request)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 912, in 
open_session
return self.session_interface.open_session(self, request)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 324, in 
open_session
s = self.get_signing_serializer(app)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 321, in 
get_signing_serializer
signer_kwargs=signer_kwargs)
File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 519, in 
__init__
self.is_text_serializer = is_text_serializer(serializer)
File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 69, in 
is_text_serializer
return isinstance(serializer.dumps({}), text_type)
File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 85, in 
dumps
return json.dumps(_tag(value), separators=(',', ':'))
File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 177, 
in dumps
_dump_arg_defaults(kwargs)
File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 98, 
in _dump_arg_defaults
bp.json_encoder if bp and bp.json_encoder
AttributeError: 'Blueprint' object has no attribute 'json_encoder'{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kevin Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454925#comment-16454925
 ] 

Kevin Yang commented on AIRFLOW-2363:
-

[~b11c] If the case is that the set_context is not called properly then this 
will not resolve 2379.

 

The entry point of the task handler's set_context() method should be here: 
[https://github.com/apache/incubator-airflow/blob/master/airflow/bin/cli.py#L460,]
 which is called in all `airflow run` command. Not sure why it can be missed.

 

I kinda suspect the reason log not being uploaded is because of the bug getting 
fixed in this issue but I might need your help to confirm that.

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File 

[jira] [Comment Edited] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Berislav Lopac (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454914#comment-16454914
 ] 

Berislav Lopac edited comment on AIRFLOW-2363 at 4/26/18 9:39 PM:
--

Is there any chance that this might also resolve AIRFLOW-2379?


was (Author: b11c):
Is there any chance that this might also resolve AIRFLOW-2375?

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  

[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Berislav Lopac (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454914#comment-16454914
 ] 

Berislav Lopac commented on AIRFLOW-2363:
-

Is there any chance that this might also resolve AIRFLOW-2375?

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2380] Add support for environment variables in Spark submit operator.

2018-04-26 Thread arthur
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b0d0d0a04 -> 36193fc74


[AIRFLOW-2380] Add support for environment variables in Spark submit operator.

Closes #3268 from piffall/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/36193fc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/36193fc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/36193fc7

Branch: refs/heads/master
Commit: 36193fc7449ca67c807b54ad17a086b35c0c4471
Parents: b0d0d0a
Author: Cristòfol Torrens 
Authored: Thu Apr 26 14:21:21 2018 -0700
Committer: Arthur Wiedmer 
Committed: Thu Apr 26 14:21:21 2018 -0700

--
 airflow/contrib/hooks/spark_submit_hook.py  | 29 +-
 .../contrib/operators/spark_submit_operator.py  | 10 +++-
 tests/contrib/hooks/test_spark_submit_hook.py   | 59 +++-
 3 files changed, 91 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/36193fc7/airflow/contrib/hooks/spark_submit_hook.py
--
diff --git a/airflow/contrib/hooks/spark_submit_hook.py 
b/airflow/contrib/hooks/spark_submit_hook.py
index 71c68c0..0185cab 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -80,6 +80,9 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 :type num_executors: int
 :param application_args: Arguments for the application being submitted
 :type application_args: list
+:param env_vars: Environment variables for spark-submit. It
+ supports yarn and k8s mode too.
+:type env_vars: dict
 :param verbose: Whether to pass the verbose flag to spark-submit process 
for debugging
 :type verbose: bool
 """
@@ -103,6 +106,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
  name='default-name',
  num_executors=None,
  application_args=None,
+ env_vars=None,
  verbose=False):
 self._conf = conf
 self._conn_id = conn_id
@@ -123,6 +127,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 self._name = name
 self._num_executors = num_executors
 self._application_args = application_args
+self._env_vars = env_vars
 self._verbose = verbose
 self._submit_sp = None
 self._yarn_application_id = None
@@ -209,6 +214,20 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 if self._conf:
 for key in self._conf:
 connection_cmd += ["--conf", "{}={}".format(key, 
str(self._conf[key]))]
+if self._env_vars and (self._is_kubernetes or self._is_yarn):
+if self._is_yarn:
+tmpl = "spark.yarn.appMasterEnv.{}={}"
+else:
+tmpl = "spark.kubernetes.driverEnv.{}={}"
+for key in self._env_vars:
+connection_cmd += [
+"--conf",
+tmpl.format(key, str(self._env_vars[key]))]
+elif self._env_vars and self._connection['deploy_mode'] != "cluster":
+self._env = self._env_vars  # Do it on Popen of the process
+elif self._env_vars and self._connection['deploy_mode'] == "cluster":
+raise AirflowException(
+"SparkSubmitHook env_vars is not supported in 
standalone-cluster mode.")
 if self._is_kubernetes:
 connection_cmd += ["--conf", 
"spark.kubernetes.namespace={}".format(
 self._connection['namespace'])]
@@ -294,6 +313,12 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 :param kwargs: extra arguments to Popen (see subprocess.Popen)
 """
 spark_submit_cmd = self._build_spark_submit_command(application)
+
+if hasattr(self, '_env'):
+env = os.environ.copy()
+env.update(self._env)
+kwargs["env"] = env
+
 self._submit_sp = subprocess.Popen(spark_submit_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,


[jira] [Resolved] (AIRFLOW-2380) Add support for environment variables in Spark submit operator

2018-04-26 Thread Arthur Wiedmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arthur Wiedmer resolved AIRFLOW-2380.
-
   Resolution: Fixed
Fix Version/s: 1.10.0

Issue resolved by pull request #3268
[https://github.com/apache/incubator-airflow/pull/3268]

> Add support for environment variables in Spark submit operator
> --
>
> Key: AIRFLOW-2380
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2380
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib, operators
>Reporter: Cristòfol Torrens
>Assignee: Cristòfol Torrens
>Priority: Minor
> Fix For: 1.10.0
>
>
> Add support for environment variables in Spark submit operator.
> For example, to pass the *HADOOP_CONF_DIR* in case of use same Spark cluster 
> with multiple HDFS.
> The idea is to pass as a dict, and resolve it in the case of using 
> *yarn-*_client/cluster_*,* and *standalone-*_client_ mode.
> In *standalone-*_cluster_ mode is not possible to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor

2018-04-26 Thread Kevin Yang (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Yang closed AIRFLOW-2383.
---
Resolution: Invalid

> Escape colon in partition name when poking inside NamedHivePartitionSensor
> --
>
> Key: AIRFLOW-2383
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2383
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Kevin Yang
>Assignee: Kevin Yang
>Priority: Major
>
> colon in NamedHivePartitionSensor is not escaping colon in the partition name 
> causing different behavior than HivePartitionSensor if there's colon in the 
> partition name. Need to escape it to `%3A`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor

2018-04-26 Thread Kevin Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454840#comment-16454840
 ] 

Kevin Yang commented on AIRFLOW-2383:
-

When using partition names, users are supposed to specify escaped values, 
closing the jira.

> Escape colon in partition name when poking inside NamedHivePartitionSensor
> --
>
> Key: AIRFLOW-2383
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2383
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Kevin Yang
>Assignee: Kevin Yang
>Priority: Major
>
> colon in NamedHivePartitionSensor is not escaping colon in the partition name 
> causing different behavior than HivePartitionSensor if there's colon in the 
> partition name. Need to escape it to `%3A`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread James Davidheiser (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454772#comment-16454772
 ] 

James Davidheiser commented on AIRFLOW-2363:


I tested again after updating to work around some other bugs and updating to 
match the latest suggested celery configuration options, and confirmed that the 
change in [https://github.com/apache/incubator-airflow/pull/3259] works for me.

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  

[jira] [Assigned] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread James Davidheiser (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Davidheiser reassigned AIRFLOW-2363:
--

Assignee: (was: James Davidheiser)

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread James Davidheiser (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Davidheiser reassigned AIRFLOW-2363:
--

Assignee: James Davidheiser  (was: Kevin Yang)

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: James Davidheiser
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor

2018-04-26 Thread Kevin Yang (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Yang reassigned AIRFLOW-2383:
---

Assignee: Kevin Yang

> Escape colon in partition name when poking inside NamedHivePartitionSensor
> --
>
> Key: AIRFLOW-2383
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2383
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Kevin Yang
>Assignee: Kevin Yang
>Priority: Major
>
> colon in NamedHivePartitionSensor is not escaping colon in the partition name 
> causing different behavior than HivePartitionSensor if there's colon in the 
> partition name. Need to escape it to `%3A`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor

2018-04-26 Thread Kevin Yang (JIRA)
Kevin Yang created AIRFLOW-2383:
---

 Summary: Escape colon in partition name when poking inside 
NamedHivePartitionSensor
 Key: AIRFLOW-2383
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2383
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Kevin Yang


colon in NamedHivePartitionSensor is not escaping colon in the partition name 
causing different behavior than HivePartitionSensor if there's colon in the 
partition name. Need to escape it to `%3A`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2374) Airflow fails to show logs

2018-04-26 Thread Kevin Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454663#comment-16454663
 ] 

Kevin Yang commented on AIRFLOW-2374:
-

Hi [~b11c], I think the BUG is handled in the following jira.

https://issues.apache.org/jira/browse/AIRFLOW-2363

> Airflow fails to show logs
> --
>
> Key: AIRFLOW-2374
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2374
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Berislav Lopac
>Assignee: Berislav Lopac
>Priority: Blocker
>
> When viewing a log in the webserver, the page shows a loading gif and the log 
> never appears. Looking in the Javascript console, the problem appears to be 
> error 500 when loading the {{get_logs_with_metadata}} endpoint, givving the 
> following trace:
> {code:java}
>   / (  ()   )  \___
>  /( (  (  )   _))  )   )\
>(( (   )()  )   (   )  )
>  ((/  ( _(   )   (   _) ) (  () )  )
> ( (  ( (_)   (((   )  .((_ ) .  )_
>( (  )(  (  ))   ) . ) (   )
>   (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
>   ( (  (   ) (  )   (  )) ) _)(   )  )  )
>  ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
>   (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
>  ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
>   ((  (   )(( _)   _) _(_ (  (_ )
>(_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
>((__)\\||lll|l||///  \_))
> (   /(/ (  )  ) )\   )
>   (( ( ( | | ) ) )\   )
>(   /(| / ( )) ) ) )) )
>  ( ( _(|)_) )
>   (  ||\(|(|)|/|| )
> (|(||(||))
>   ( //|/l|||)|\\ \ )
> (/ / //  /|//\\  \ \  \ _)
> ---
> Node: airflow-nods-dev
> ---
> Traceback (most recent call last):
>   File 
> "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line 
> 113, in _read
> remote_log = self.gcs_read(remote_loc)
>   File 
> "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line 
> 131, in gcs_read
> return self.hook.download(bkt, blob).decode()
>   File "/opt/airflow/src/apache-airflow/airflow/contrib/hooks/gcs_hook.py", 
> line 107, in download
> .get_media(bucket=bucket, object=object) \
>   File "/usr/local/lib/python3.6/dist-packages/oauth2client/_helpers.py", 
> line 133, in positional_wrapper
> return wrapped(*args, **kwargs)
>   File "/usr/local/lib/python3.6/dist-packages/googleapiclient/http.py", line 
> 841, in execute
> raise HttpError(resp, content, uri=self.uri)
> googleapiclient.errors.HttpError:  https://www.googleapis.com/storage/v1/b/bucket-af/o/test-logs%2Fgeneric_transfer_single%2Ftransfer_file%2F2018-04-25T13%3A00%3A51.250983%2B00%3A00%2F1.log?alt=media
>  returned "Not Found">
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in 
> wsgi_app
> response = self.full_dispatch_request()
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in 
> full_dispatch_request
> rv = self.handle_user_exception(e)
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in 
> handle_user_exception
> reraise(exc_type, exc_value, tb)
>   File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 33, in 
> reraise
> raise value
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in 
> full_dispatch_request
> rv = self.dispatch_request()
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in 
> dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
>   File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 69, 
> in inner
> return self._run_view(f, *args, **kwargs)
>   File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 
> 368, in _run_view
> return fn(self, *args, **kwargs)
>   File "/usr/local/lib/python3.6/dist-packages/flask_login.py", line 758, in 
> decorated_view
> return func(*args, **kwargs)
>   File "/opt/airflow/src/apache-airflow/airflow/www/utils.py", line 269, in 
> wrapper
> return f(*args, **kwargs)
>   File 

[jira] [Assigned] (AIRFLOW-2382) Fix wrong description for delimiter

2018-04-26 Thread Kengo Seki (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kengo Seki reassigned AIRFLOW-2382:
---

Assignee: Kengo Seki

> Fix wrong description for delimiter
> ---
>
> Key: AIRFLOW-2382
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2382
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws, operators
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
>
> The document for S3ListOperator says:
> {code}
> :param delimiter: The delimiter by which you want to filter the objects.
> For e.g to lists the CSV files from in a directory in S3 you would use
> delimiter='.csv'.
> {code}
> {code}
> **Example**:
> The following operator would list all the CSV files from the S3
> ``customers/2018/04/`` key in the ``data`` bucket. ::
> s3_file = S3ListOperator(
> task_id='list_3s_files',
> bucket='data',
> prefix='customers/2018/04/',
> delimiter='.csv',
> aws_conn_id='aws_customers_conn'
> )
> {code}
> but it actually behaves oppositely:
> {code}
> In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator
> In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3').execute(None)
> [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe']
> In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
> aws_conn_id='s3', delimiter='.csv').execute(None)
> [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.amazonaws.com
> [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
> [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS 
> connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
> Out[3]: ['1.txt', '2.jpg', '3.exe']
> {code}
> This is because that the 'delimiter' parameter is for representing path 
> hierarchy (so '/' is used typically), not file extension. Also 
> S3ToGoogleCloudStorageOperator has the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454591#comment-16454591
 ] 

Kyle Hamlin edited comment on AIRFLOW-2363 at 4/26/18 5:31 PM:
---

[~yrqls21] Just testing this looks like its working now

– 
Kyle Hamlin


was (Author: hamlinkn):
+Kevin Yang (JIRA)  Just testing this look like it
working now




-- 
Kyle Hamlin


> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> 

[jira] [Resolved] (AIRFLOW-2377) Improve Sendgrid sender support

2018-04-26 Thread Siddharth Anand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siddharth Anand resolved AIRFLOW-2377.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

Issue resolved by pull request #3266
[https://github.com/apache/incubator-airflow/pull/3266]

> Improve Sendgrid sender support
> ---
>
> Key: AIRFLOW-2377
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2377
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Marcin Szymanski
>Assignee: Marcin Szymanski
>Priority: Minor
> Fix For: 2.0.0
>
>
> * Add support for for sender name
>  * Allow passing sender email and name via kwargs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454591#comment-16454591
 ] 

Kyle Hamlin commented on AIRFLOW-2363:
--

+Kevin Yang (JIRA)  Just testing this look like it
working now




-- 
Kyle Hamlin


> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was sent by Atlassian 

[jira] [Commented] (AIRFLOW-2377) Improve Sendgrid sender support

2018-04-26 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454592#comment-16454592
 ] 

ASF subversion and git services commented on AIRFLOW-2377:
--

Commit b0d0d0a041bd5c4b19f6b97dc0fa289436743272 in incubator-airflow's branch 
refs/heads/master from [~ms32035]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=b0d0d0a ]

[AIRFLOW-2377] Improve Sendgrid sender support

Closes #3266 from ms32035/sendgrid_sender


> Improve Sendgrid sender support
> ---
>
> Key: AIRFLOW-2377
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2377
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Marcin Szymanski
>Assignee: Marcin Szymanski
>Priority: Minor
> Fix For: 2.0.0
>
>
> * Add support for for sender name
>  * Allow passing sender email and name via kwargs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2377] Improve Sendgrid sender support

2018-04-26 Thread sanand
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e44688ed0 -> b0d0d0a04


[AIRFLOW-2377] Improve Sendgrid sender support

Closes #3266 from ms32035/sendgrid_sender


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0d0d0a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0d0d0a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0d0d0a0

Branch: refs/heads/master
Commit: b0d0d0a041bd5c4b19f6b97dc0fa289436743272
Parents: e44688e
Author: Marcin Szymanski 
Authored: Thu Apr 26 10:25:32 2018 -0700
Committer: r39132 
Committed: Thu Apr 26 10:25:32 2018 -0700

--
 airflow/contrib/utils/sendgrid.py|  8 +---
 tests/contrib/utils/test_sendgrid.py | 31 +++
 2 files changed, 32 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0d0d0a0/airflow/contrib/utils/sendgrid.py
--
diff --git a/airflow/contrib/utils/sendgrid.py 
b/airflow/contrib/utils/sendgrid.py
index 206bb93..9055c97 100644
--- a/airflow/contrib/utils/sendgrid.py
+++ b/airflow/contrib/utils/sendgrid.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -51,7 +51,9 @@ def send_email(to, subject, html_content, files=None,
 SENDGRID_API_KEY={your-sendgrid-api-key}.
 """
 mail = Mail()
-mail.from_email = Email(os.environ.get('SENDGRID_MAIL_FROM'))
+from_email = kwargs.get('from_email') or 
os.environ.get('SENDGRID_MAIL_FROM')
+from_name = kwargs.get('from_name') or 
os.environ.get('SENDGRID_MAIL_SENDER')
+mail.from_email = Email(from_email, from_name)
 mail.subject = subject
 
 # Add the recipient list of to emails.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0d0d0a0/tests/contrib/utils/test_sendgrid.py
--
diff --git a/tests/contrib/utils/test_sendgrid.py 
b/tests/contrib/utils/test_sendgrid.py
index 997bc55..6710076 100644
--- a/tests/contrib/utils/test_sendgrid.py
+++ b/tests/contrib/utils/test_sendgrid.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,17 +50,27 @@ class SendEmailSendGridTest(unittest.TestCase):
 'subject': 'sendgrid-send-email unit test'}
 self.personalization_custom_args = {'arg1': 'val1', 'arg2': 'val2'}
 self.categories = ['cat1', 'cat2']
+# extras
 self.expected_mail_data_extras = copy.deepcopy(self.expected_mail_data)
 self.expected_mail_data_extras['personalizations'][0]['custom_args'] = 
\
 self.personalization_custom_args
 self.expected_mail_data_extras['categories'] = self.categories
+self.expected_mail_data_extras['from'] = \
+{'name': 'Foo', 'email': 'f...@bar.com'}
+# sender
+self.expected_mail_data_sender = copy.deepcopy(self.expected_mail_data)
+self.expected_mail_data_sender['from'] = \
+{'name': 'Foo Bar', 'email': 'f...@foo.bar'}
 
 # Test the right email is constructed.
 
 @mock.patch('os.environ.get')
 @mock.patch('airflow.contrib.utils.sendgrid._post_sendgrid_mail')
 def test_send_email_sendgrid_correct_email(self, mock_post, mock_get):
-mock_get.return_value = 'f...@bar.com'
+def get_return(var):
+return {'SENDGRID_MAIL_FROM': 'f...@bar.com'}.get(var)
+
+mock_get.side_effect = get_return
 send_email(self.to, self.subject, self.html_content, cc=self.cc, 
bcc=self.bcc)
 mock_post.assert_called_with(self.expected_mail_data)
 
@@ -68,8 +78,21 @@ class SendEmailSendGridTest(unittest.TestCase):
 @mock.patch('os.environ.get')
 @mock.patch('airflow.contrib.utils.sendgrid._post_sendgrid_mail')
 def test_send_email_sendgrid_correct_email_extras(self, mock_post, 
mock_get):
-mock_get.return_value = 'f...@bar.com'
+def get_return(var):
+   

[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454533#comment-16454533
 ] 

Kyle Hamlin commented on AIRFLOW-2363:
--

Yeah trying it now. Was there a new field added to the task_instance
(executor_config) table?




-- 
Kyle Hamlin


> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was sent 

[jira] [Created] (AIRFLOW-2382) Fix wrong description for delimiter

2018-04-26 Thread Kengo Seki (JIRA)
Kengo Seki created AIRFLOW-2382:
---

 Summary: Fix wrong description for delimiter
 Key: AIRFLOW-2382
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2382
 Project: Apache Airflow
  Issue Type: Bug
  Components: aws, operators
Reporter: Kengo Seki


The document for S3ListOperator says:

{code}
:param delimiter: The delimiter by which you want to filter the objects.
For e.g to lists the CSV files from in a directory in S3 you would use
delimiter='.csv'.
{code}

{code}
**Example**:
The following operator would list all the CSV files from the S3
``customers/2018/04/`` key in the ``data`` bucket. ::

s3_file = S3ListOperator(
task_id='list_3s_files',
bucket='data',
prefix='customers/2018/04/',
delimiter='.csv',
aws_conn_id='aws_customers_conn'
)
{code}

but it actually behaves oppositely:

{code}
In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator

In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', 
aws_conn_id='s3').execute(None)
[2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): bkt0.s3.amazonaws.com
[2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
[2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe']

In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', aws_conn_id='s3', 
delimiter='.csv').execute(None)
[2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): bkt0.s3.amazonaws.com
[2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): bkt0.s3-ap-northeast-1.amazonaws.com
[2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): bkt0.s3.ap-northeast-1.amazonaws.com
Out[3]: ['1.txt', '2.jpg', '3.exe']
{code}

This is because that the 'delimiter' parameter is for representing path 
hierarchy (so '/' is used typically), not file extension. Also 
S3ToGoogleCloudStorageOperator has the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kevin Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454406#comment-16454406
 ] 

Kevin Yang commented on AIRFLOW-2363:
-

[~hamlinkn] Do you mind try out the change in this PR 
[https://github.com/apache/incubator-airflow/pull/3259] and see if the issue 
was resolved?

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not 

[jira] [Issue Comment Deleted] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Hamlin updated AIRFLOW-2363:
-
Comment: was deleted

(was: I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 
and
I have been have no problems




-- 
Kyle Hamlin
)

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was 

[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454306#comment-16454306
 ] 

Kyle Hamlin commented on AIRFLOW-2363:
--

I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and I 
there have been no problems since
 

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This message was sent 

[jira] [Comment Edited] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454306#comment-16454306
 ] 

Kyle Hamlin edited comment on AIRFLOW-2363 at 4/26/18 2:27 PM:
---

I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and 
there have been no problems since
  


was (Author: hamlinkn):
I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and I 
there have been no problems since
 

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> 

[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kyle Hamlin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454305#comment-16454305
 ] 

Kyle Hamlin commented on AIRFLOW-2363:
--

I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and
I have been have no problems




-- 
Kyle Hamlin


> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in 
> decorated_view
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 
> 269, in wrapper
>      return f(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 74, in wrapper
>      return func(*args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 
> 770, in get_logs_with_metadata
>      logs, metadatas = handler.read(ti, try_number, metadata=metadata)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
>  line 165, in read
>      logs[i] += log
>  TypeError: must be str, not tuple



--
This 

[jira] [Created] (AIRFLOW-2381) Fix the Flaky ApiPasswordTests

2018-04-26 Thread Fokko Driesprong (JIRA)
Fokko Driesprong created AIRFLOW-2381:
-

 Summary: Fix the Flaky ApiPasswordTests
 Key: AIRFLOW-2381
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2381
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: Fokko Driesprong


Currently the ApiPasswordTests fail because the dag is not available in the 
database. I believe this is an issue with the different tests running parallel.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work started] (AIRFLOW-2380) Add support for environment variables in Spark submit operator

2018-04-26 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-2380 started by Cristòfol Torrens.
--
> Add support for environment variables in Spark submit operator
> --
>
> Key: AIRFLOW-2380
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2380
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib, operators
>Reporter: Cristòfol Torrens
>Assignee: Cristòfol Torrens
>Priority: Minor
>
> Add support for environment variables in Spark submit operator.
> For example, to pass the *HADOOP_CONF_DIR* in case of use same Spark cluster 
> with multiple HDFS.
> The idea is to pass as a dict, and resolve it in the case of using 
> *yarn-*_client/cluster_*,* and *standalone-*_client_ mode.
> In *standalone-*_cluster_ mode is not possible to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2380) Add support for environment variables in Spark submit operator

2018-04-26 Thread JIRA
Cristòfol Torrens created AIRFLOW-2380:
--

 Summary: Add support for environment variables in Spark submit 
operator
 Key: AIRFLOW-2380
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2380
 Project: Apache Airflow
  Issue Type: Improvement
  Components: contrib, operators
Reporter: Cristòfol Torrens
Assignee: Cristòfol Torrens


Add support for environment variables in Spark submit operator.

For example, to pass the *HADOOP_CONF_DIR* in case of use same Spark cluster 
with multiple HDFS.

The idea is to pass as a dict, and resolve it in the case of using 
*yarn-*_client/cluster_*,* and *standalone-*_client_ mode.

In *standalone-*_cluster_ mode is not possible to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work started] (AIRFLOW-2222) GoogleCloudStorageHook.copy fails for large files between locations

2018-04-26 Thread Berislav Lopac (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW- started by Berislav Lopac.
---
> GoogleCloudStorageHook.copy fails for large files between locations
> ---
>
> Key: AIRFLOW-
> URL: https://issues.apache.org/jira/browse/AIRFLOW-
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Berislav Lopac
>Assignee: Berislav Lopac
>Priority: Major
> Fix For: 2.0.0
>
>
> When copying large files (confirmed for around 3GB) between buckets in 
> different projects, the operation fails and the Google API returns error 
> [413—Payload Too 
> Large|https://cloud.google.com/storage/docs/json_api/v1/status-codes#413_Payload_Too_Large].
>  The documentation for the error says:
> {quote}The Cloud Storage JSON API supports up to 5 TB objects.
> This error may, alternatively, arise if copying objects between locations 
> and/or storage classes can not complete within 30 seconds. In this case, use 
> the 
> [Rewrite|https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite] 
> method instead.{quote}
> The reason seems to be that the {{GoogleCloudStorageHook.copy}} is using the 
> API {{copy}} method.
> h3. Proposed Solution
> There are two potential solutions:
> # Implement {{GoogleCloudStorageHook.rewrite}} method which can be called 
> from operators and other objects to ensure successful execution. This method 
> is more flexible but requires changes both in the {{GoogleCloudStorageHook}} 
> class and any other classes that use it for copying files to ensure that they 
> explicitly call {{rewrite}} when needed.
> # Modify {{GoogleCloudStorageHook.copy}} to determine when to use {{rewrite}} 
> instead of {{copy}} underneath. This requires updating only the 
> {{GoogleCloudStorageHook}} class, but the logic might not cover all the edge 
> cases and could be difficult to implement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work started] (AIRFLOW-2374) Airflow fails to show logs

2018-04-26 Thread Berislav Lopac (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-2374 started by Berislav Lopac.
---
> Airflow fails to show logs
> --
>
> Key: AIRFLOW-2374
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2374
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Berislav Lopac
>Assignee: Berislav Lopac
>Priority: Blocker
>
> When viewing a log in the webserver, the page shows a loading gif and the log 
> never appears. Looking in the Javascript console, the problem appears to be 
> error 500 when loading the {{get_logs_with_metadata}} endpoint, givving the 
> following trace:
> {code:java}
>   / (  ()   )  \___
>  /( (  (  )   _))  )   )\
>(( (   )()  )   (   )  )
>  ((/  ( _(   )   (   _) ) (  () )  )
> ( (  ( (_)   (((   )  .((_ ) .  )_
>( (  )(  (  ))   ) . ) (   )
>   (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
>   ( (  (   ) (  )   (  )) ) _)(   )  )  )
>  ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
>   (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
>  ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
>   ((  (   )(( _)   _) _(_ (  (_ )
>(_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
>((__)\\||lll|l||///  \_))
> (   /(/ (  )  ) )\   )
>   (( ( ( | | ) ) )\   )
>(   /(| / ( )) ) ) )) )
>  ( ( _(|)_) )
>   (  ||\(|(|)|/|| )
> (|(||(||))
>   ( //|/l|||)|\\ \ )
> (/ / //  /|//\\  \ \  \ _)
> ---
> Node: airflow-nods-dev
> ---
> Traceback (most recent call last):
>   File 
> "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line 
> 113, in _read
> remote_log = self.gcs_read(remote_loc)
>   File 
> "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line 
> 131, in gcs_read
> return self.hook.download(bkt, blob).decode()
>   File "/opt/airflow/src/apache-airflow/airflow/contrib/hooks/gcs_hook.py", 
> line 107, in download
> .get_media(bucket=bucket, object=object) \
>   File "/usr/local/lib/python3.6/dist-packages/oauth2client/_helpers.py", 
> line 133, in positional_wrapper
> return wrapped(*args, **kwargs)
>   File "/usr/local/lib/python3.6/dist-packages/googleapiclient/http.py", line 
> 841, in execute
> raise HttpError(resp, content, uri=self.uri)
> googleapiclient.errors.HttpError:  https://www.googleapis.com/storage/v1/b/bucket-af/o/test-logs%2Fgeneric_transfer_single%2Ftransfer_file%2F2018-04-25T13%3A00%3A51.250983%2B00%3A00%2F1.log?alt=media
>  returned "Not Found">
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in 
> wsgi_app
> response = self.full_dispatch_request()
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in 
> full_dispatch_request
> rv = self.handle_user_exception(e)
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in 
> handle_user_exception
> reraise(exc_type, exc_value, tb)
>   File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 33, in 
> reraise
> raise value
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in 
> full_dispatch_request
> rv = self.dispatch_request()
>   File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in 
> dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
>   File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 69, 
> in inner
> return self._run_view(f, *args, **kwargs)
>   File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 
> 368, in _run_view
> return fn(self, *args, **kwargs)
>   File "/usr/local/lib/python3.6/dist-packages/flask_login.py", line 758, in 
> decorated_view
> return func(*args, **kwargs)
>   File "/opt/airflow/src/apache-airflow/airflow/www/utils.py", line 269, in 
> wrapper
> return f(*args, **kwargs)
>   File "/opt/airflow/src/apache-airflow/airflow/utils/db.py", line 74, in 
> wrapper
> return func(*args, **kwargs)
>   File 

[jira] [Work started] (AIRFLOW-2361) Allow GoogleCloudStorageToGoogleCloudStorageOperator to store list of copied files to XCom

2018-04-26 Thread Berislav Lopac (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-2361 started by Berislav Lopac.
---
> Allow GoogleCloudStorageToGoogleCloudStorageOperator to store list of copied 
> files to XCom
> --
>
> Key: AIRFLOW-2361
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2361
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Berislav Lopac
>Assignee: Berislav Lopac
>Priority: Minor
>
> When {{GoogleCloudStorageToGoogleCloudStorageOperator}} is used with a 
> wildcard, it can copy more than one file. It would be useful if there would 
> exist a mechanism to store the list of copied files as XCom so it can be used 
> by other tasks downstream.
> Proposed solution: Add a {{xcom_push}} flag argument to the constructor; if 
> {{True}}, the {{execute}} method returns a list of two-tuples representing 
> each source/destination file pair:
> {code:java}
> [
> ("original/file/path.ext", "target/prefix/original/file/path.ext"),
> ...
> ]{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2379) Logging: GCSTaskHandler does not upload the logs

2018-04-26 Thread Berislav Lopac (JIRA)
Berislav Lopac created AIRFLOW-2379:
---

 Summary: Logging: GCSTaskHandler does not upload the logs
 Key: AIRFLOW-2379
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2379
 Project: Apache Airflow
  Issue Type: Bug
  Components: logging
Reporter: Berislav Lopac


In the current master, when using {{GCSTaskHandler}} the logs are not uploaded 
to the GCS bucket. I've narrowed it down to the handler not being able to find 
the local log file in the {{.close}} method, which in turn happens because the 
method {{.set_context}} is not called in some cases, specifically at the end of 
the run when the log is supposed to be uploaded.

I'll keep exploring the issue when I have the time, but perhaps someone with 
better understanding of the logging mechanism might be able to provide some 
insight.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str

2018-04-26 Thread Kevin Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453663#comment-16453663
 ] 

Kevin Yang commented on AIRFLOW-2363:
-

[~jdavidh], it was a tricky one to debug, I actually think the NoneType error 
exist even before 5cb530b455be54e6b58eae19c8c10ef8f5cf955d was merged (at least 
in my naive setup with S3). That error blocks one attempt of uploading (there's 
actually multiple attempts, whenever the s3 task handler was closed) and the 
one that's not blocked got removed by 5cb530b455be54e6b58eae19c8c10ef8f5cf955d 
and I made a fix to it in the PR.

I'm going to assume the root cause is that the uploading called in atexit() was 
killed when the subprocess ended and the upload cannot finish (according to my 
debugging logs). But I believe there's more juice in this task handler closing 
issue and need some more work to be perfect. I'm gonna stop here due to 
priority change but I would be very curious to know all the details if you 
decided to dig to the end of it.

> S3 remote logging appending tuple instead of str
> 
>
> Key: AIRFLOW-2363
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2363
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Reporter: Kyle Hamlin
>Assignee: Kevin Yang
>Priority: Major
> Fix For: 1.10.0
>
>
> A recent merge into master that added support for Elasticsearch logging seems 
> to have broken S3 logging by returning a tuple instead of a string.
> [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128]
>  
> following errors thrown:
>  
> *Session NoneType error*
>  Traceback (most recent call last):
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
>  line 171, in s3_write
>      encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 274, in load_string
>      encrypt=encrypt)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 313, in load_bytes
>      client = self.get_conn()
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", 
> line 34, in get_conn
>      return self.get_client_type('s3')
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 151, in get_client_type
>      session, endpoint_url = self._get_credentials(region_name)
>    File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", 
> line 97, in _get_credentials
>      connection_object = self.get_connection(self.aws_conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 82, in get_connection
>      conn = random.choice(cls.get_connections(conn_id))
>    File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
> line 77, in get_connections
>      conns = cls._get_connections_from_db(conn_id)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 72, in wrapper
>      with create_session() as session:
>    File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
>      return next(self.gen)
>    File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 
> 41, in create_session
>      session = settings.Session()
>  TypeError: 'NoneType' object is not callable
>  
> *TypeError must be str not tuple*
>  [2018-04-16 18:37:28,200] ERROR in app: Exception on 
> /admin/airflow/get_logs_with_metadata [GET]
>  Traceback (most recent call last):
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in 
> wsgi_app
>      response = self.full_dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in 
> full_dispatch_request
>      rv = self.handle_user_exception(e)
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in 
> handle_user_exception
>      reraise(exc_type, exc_value, tb)
>    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, 
> in reraise
>      raise value
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in 
> full_dispatch_request
>      rv = self.dispatch_request()
>    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in 
> dispatch_request
>      return self.view_functions[rule.endpoint](**req.view_args)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 69, in inner
>      return self._run_view(f, *args, **kwargs)
>    File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 
> 368, in _run_view
>      return fn(self, *args, **kwargs)
>    File 

incubator-airflow git commit: [AIRFLOW-2331] Support init action timeout on dataproc cluster create

2018-04-26 Thread fokko
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 84cfbf6a1 -> cb264e940


[AIRFLOW-2331] Support init action timeout on dataproc cluster create

Closes #3235 from piffall/master

(cherry picked from commit e44688ed090a438a20751e11cc96c72554630f1d)
Signed-off-by: Fokko Driesprong 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb264e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb264e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb264e94

Branch: refs/heads/v1-10-test
Commit: cb264e940e90a865726ea788e5fd3c2f98f4817e
Parents: 84cfbf6
Author: Cristòfol Torrens 
Authored: Thu Apr 26 09:59:51 2018 +0200
Committer: Fokko Driesprong 
Committed: Thu Apr 26 10:00:17 2018 +0200

--
 airflow/contrib/operators/dataproc_operator.py  | 29 ++--
 .../contrib/operators/test_dataproc_operator.py | 12 ++--
 2 files changed, 36 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb264e94/airflow/contrib/operators/dataproc_operator.py
--
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 728fae9..56ebb91 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,8 +20,10 @@
 
 import ntpath
 import os
+import re
 import time
 import uuid
+from datetime import timedelta
 
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
@@ -57,6 +59,9 @@ class DataprocClusterCreateOperator(BaseOperator):
 :param init_actions_uris: List of GCS uri's containing
 dataproc initialization scripts
 :type init_actions_uris: list[string]
+:param init_action_timeout: Amount of time executable scripts in
+init_actions_uris has to complete
+:type init_action_timeout: string
 :param metadata: dict of key-value google compute engine metadata entries
 to add to all instances
 :type metadata: dict
@@ -115,6 +120,7 @@ class DataprocClusterCreateOperator(BaseOperator):
  tags=None,
  storage_bucket=None,
  init_actions_uris=None,
+ init_action_timeout="10m",
  metadata=None,
  image_version=None,
  properties=None,
@@ -141,6 +147,7 @@ class DataprocClusterCreateOperator(BaseOperator):
 self.num_preemptible_workers = num_preemptible_workers
 self.storage_bucket = storage_bucket
 self.init_actions_uris = init_actions_uris
+self.init_action_timeout = init_action_timeout
 self.metadata = metadata
 self.image_version = image_version
 self.properties = properties
@@ -206,6 +213,19 @@ class DataprocClusterCreateOperator(BaseOperator):
 return
 time.sleep(15)
 
+def _get_init_action_timeout(self):
+match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout)
+if match:
+if match.group(2) == "s":
+return self.init_action_timeout
+elif match.group(2) == "m":
+val = float(match.group(1))
+return "{}s".format(timedelta(minutes=val).seconds)
+
+raise AirflowException(
+"DataprocClusterCreateOperator init_action_timeout"
+" should be expressed in minutes or seconds. i.e. 10m, 30s")
+
 def _build_cluster_data(self):
 zone_uri = \
 
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
@@ -276,7 +296,10 @@ class DataprocClusterCreateOperator(BaseOperator):
 cluster_data['config']['softwareConfig']['properties'] = 
self.properties
 if self.init_actions_uris:
 init_actions_dict = [
-{'executableFile': uri} for uri in self.init_actions_uris
+{
+'executableFile': uri,
+'executionTimeout': self._get_init_action_timeout()
+} for uri in self.init_actions_uris
 ]
 

[jira] [Resolved] (AIRFLOW-2331) Add support for initialization action timeout on dataproc cluster creation.

2018-04-26 Thread Fokko Driesprong (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fokko Driesprong resolved AIRFLOW-2331.
---
   Resolution: Fixed
Fix Version/s: (was: Airflow 2.0)
   2.0.0

Issue resolved by pull request #3235
[https://github.com/apache/incubator-airflow/pull/3235]

> Add support for initialization action timeout on dataproc cluster creation.
> ---
>
> Key: AIRFLOW-2331
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2331
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Cristòfol Torrens
>Assignee: Cristòfol Torrens
>Priority: Minor
>  Labels: contrib, dataproc, operator
> Fix For: 2.0.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Add support to customize timeout for initialization scripts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2331] Support init action timeout on dataproc cluster create

2018-04-26 Thread fokko
Repository: incubator-airflow
Updated Branches:
  refs/heads/master dde066d00 -> e44688ed0


[AIRFLOW-2331] Support init action timeout on dataproc cluster create

Closes #3235 from piffall/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e44688ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e44688ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e44688ed

Branch: refs/heads/master
Commit: e44688ed090a438a20751e11cc96c72554630f1d
Parents: dde066d
Author: Cristòfol Torrens 
Authored: Thu Apr 26 09:59:51 2018 +0200
Committer: Fokko Driesprong 
Committed: Thu Apr 26 09:59:51 2018 +0200

--
 airflow/contrib/operators/dataproc_operator.py  | 29 ++--
 .../contrib/operators/test_dataproc_operator.py | 12 ++--
 2 files changed, 36 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e44688ed/airflow/contrib/operators/dataproc_operator.py
--
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 728fae9..56ebb91 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,8 +20,10 @@
 
 import ntpath
 import os
+import re
 import time
 import uuid
+from datetime import timedelta
 
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
@@ -57,6 +59,9 @@ class DataprocClusterCreateOperator(BaseOperator):
 :param init_actions_uris: List of GCS uri's containing
 dataproc initialization scripts
 :type init_actions_uris: list[string]
+:param init_action_timeout: Amount of time executable scripts in
+init_actions_uris has to complete
+:type init_action_timeout: string
 :param metadata: dict of key-value google compute engine metadata entries
 to add to all instances
 :type metadata: dict
@@ -115,6 +120,7 @@ class DataprocClusterCreateOperator(BaseOperator):
  tags=None,
  storage_bucket=None,
  init_actions_uris=None,
+ init_action_timeout="10m",
  metadata=None,
  image_version=None,
  properties=None,
@@ -141,6 +147,7 @@ class DataprocClusterCreateOperator(BaseOperator):
 self.num_preemptible_workers = num_preemptible_workers
 self.storage_bucket = storage_bucket
 self.init_actions_uris = init_actions_uris
+self.init_action_timeout = init_action_timeout
 self.metadata = metadata
 self.image_version = image_version
 self.properties = properties
@@ -206,6 +213,19 @@ class DataprocClusterCreateOperator(BaseOperator):
 return
 time.sleep(15)
 
+def _get_init_action_timeout(self):
+match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout)
+if match:
+if match.group(2) == "s":
+return self.init_action_timeout
+elif match.group(2) == "m":
+val = float(match.group(1))
+return "{}s".format(timedelta(minutes=val).seconds)
+
+raise AirflowException(
+"DataprocClusterCreateOperator init_action_timeout"
+" should be expressed in minutes or seconds. i.e. 10m, 30s")
+
 def _build_cluster_data(self):
 zone_uri = \
 
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
@@ -276,7 +296,10 @@ class DataprocClusterCreateOperator(BaseOperator):
 cluster_data['config']['softwareConfig']['properties'] = 
self.properties
 if self.init_actions_uris:
 init_actions_dict = [
-{'executableFile': uri} for uri in self.init_actions_uris
+{
+'executableFile': uri,
+'executionTimeout': self._get_init_action_timeout()
+} for uri in self.init_actions_uris
 ]
 cluster_data['config']['initializationActions'] = init_actions_dict
 if self.service_account:


[jira] [Commented] (AIRFLOW-2331) Add support for initialization action timeout on dataproc cluster creation.

2018-04-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453630#comment-16453630
 ] 

Cristòfol Torrens commented on AIRFLOW-2331:


Pull request approved: https://github.com/apache/incubator-airflow/pull/3235

> Add support for initialization action timeout on dataproc cluster creation.
> ---
>
> Key: AIRFLOW-2331
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2331
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Cristòfol Torrens
>Assignee: Cristòfol Torrens
>Priority: Minor
>  Labels: contrib, dataproc, operator
> Fix For: Airflow 2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Add support to customize timeout for initialization scripts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)