[jira] [Commented] (AIRFLOW-2385) Airflow task is not stopped when execution timeout gets triggered
[ https://issues.apache.org/jira/browse/AIRFLOW-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455717#comment-16455717 ] Arthur Wiedmer commented on AIRFLOW-2385: - Hi Yohei, Unless I am mistaken, it looks like your operator is executing a Spark Job (I seem to recognize the progress bar from the logs.). execution_timeout will only a raise an exception in the Python process, but it might not kill the job. You probably want to implement the on_kill method for your operator so that it knows how to clean up your process. It has been implemented in a few operators already in the code base. Good luck! > Airflow task is not stopped when execution timeout gets triggered > - > > Key: AIRFLOW-2385 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2385 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.9.0 >Reporter: Yohei Onishi >Priority: Major > > I have my own custom operator extends BaseOperator as follows. I tried to > kill a task if the task runs for more than 30 minutes. timeout seems to be > triggered according to a log but the task still continued. > Am I missing something? I checked the official document but do not know what > is wrong.[https://airflow.apache.org/code.html#baseoperator] > My operator is like as follows. > {code:java} > class MyOperator(BaseOperator): > @apply_defaults > def __init__( > self, > some_parameters_here, > *args, > **kwargs): > super(MyOperator, self).__init__(*args, **kwargs) > # some initialization here > def execute(self, context): > # some code here > {code} > > {{}}My task is like as follows. > {code:java} > t = MyOperator( > task_id='task', > dag=scheduled_dag, > execution_timeout=timedelta(minutes=30) > {code} > > I found this error but the task continued. > {code:java} > [2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage > 6:==(1380 + -160) / > 1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2385) Airflow task is not stopped when execution timeout gets triggered
Yohei Onishi created AIRFLOW-2385: - Summary: Airflow task is not stopped when execution timeout gets triggered Key: AIRFLOW-2385 URL: https://issues.apache.org/jira/browse/AIRFLOW-2385 Project: Apache Airflow Issue Type: Bug Components: DAG Affects Versions: 1.9.0 Reporter: Yohei Onishi I have my own custom operator extends BaseOperator as follows. I tried to kill a task if the task runs for more than 30 minutes. timeout seems to be triggered according to a log but the task still continued. Am I missing something? I checked the official document but do not know what is wrong.[https://airflow.apache.org/code.html#baseoperator] My operator is like as follows. {code:java} class MyOperator(BaseOperator): @apply_defaults def __init__( self, some_parameters_here, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) # some initialization here def execute(self, context): # some code here {code} {{}}My task is like as follows. {code:java} t = MyOperator( task_id='task', dag=scheduled_dag, execution_timeout=timedelta(minutes=30) {code} I found this error but the task continued. {code:java} [2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 6:==(1380 + -160) / 1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2378) Add Groupon to README
[ https://issues.apache.org/jira/browse/AIRFLOW-2378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand resolved AIRFLOW-2378. -- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3267 [https://github.com/apache/incubator-airflow/pull/3267] > Add Groupon to README > - > > Key: AIRFLOW-2378 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2378 > Project: Apache Airflow > Issue Type: Wish >Reporter: steven casey >Assignee: steven casey >Priority: Trivial > Fix For: 2.0.0 > > > Add Groupon to current list of Airflow users -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2378) Add Groupon to README
[ https://issues.apache.org/jira/browse/AIRFLOW-2378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455645#comment-16455645 ] ASF subversion and git services commented on AIRFLOW-2378: -- Commit 801fe7dbdcc74988fe937e013d7f019df87c44e5 in incubator-airflow's branch refs/heads/master from [~stevencasey] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=801fe7d ] [AIRFLOW-2378] Add Groupon to list of current users Closes #3267 from stevencasey/add_groupon > Add Groupon to README > - > > Key: AIRFLOW-2378 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2378 > Project: Apache Airflow > Issue Type: Wish >Reporter: steven casey >Assignee: steven casey >Priority: Trivial > Fix For: 2.0.0 > > > Add Groupon to current list of Airflow users -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2378] Add Groupon to list of current users
Repository: incubator-airflow Updated Branches: refs/heads/master ae63246a1 -> 801fe7dbd [AIRFLOW-2378] Add Groupon to list of current users Closes #3267 from stevencasey/add_groupon Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/801fe7db Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/801fe7db Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/801fe7db Branch: refs/heads/master Commit: 801fe7dbdcc74988fe937e013d7f019df87c44e5 Parents: ae63246 Author: steven casey <265229+stevenca...@users.noreply.github.com> Authored: Thu Apr 26 18:47:28 2018 -0700 Committer: r39132Committed: Thu Apr 26 18:47:28 2018 -0700 -- README.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/801fe7db/README.md -- diff --git a/README.md b/README.md index 5e5193c..b504cbd 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,7 @@ Currently **officially** using Airflow: 1. [GovTech GDS](https://gds-gov.tech) [[@chrissng](https://github.com/chrissng) & [@datagovsg](https://github.com/datagovsg)] 1. [Grand Rounds](https://www.grandrounds.com/) [[@richddr](https://github.com/richddr), [@timz1290](https://github.com/timz1290), [@wenever](https://github.com/@wenever), & [@runongirlrunon](https://github.com/runongirlrunon)] 1. [Groupalia](http://es.groupalia.com) [[@jesusfcr](https://github.com/jesusfcr)] +1. [Groupon](https://groupon.com) [[@stevencasey](https://github.com/stevencasey)] 1. [Gusto](https://gusto.com) [[@frankhsu](https://github.com/frankhsu)] 1. [Handshake](https://joinhandshake.com/) [[@mhickman](https://github.com/mhickman)] 1. [Handy](http://www.handy.com/careers/73115?gh_jid=73115_src=o5qcxn) [[@marcintustin](https://github.com/marcintustin) / [@mtustin-handy](https://github.com/mtustin-handy)]
[jira] [Commented] (AIRFLOW-2382) Fix wrong description for delimiter
[ https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455642#comment-16455642 ] ASF subversion and git services commented on AIRFLOW-2382: -- Commit ae63246a1d580d77c366276300139165e4c984de in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ae63246 ] [AIRFLOW-2382] Fix wrong description for delimiter Fix misleading descriptions for the 'delimiter' parameter in S3ListOperator and S3ToGoogleCloudStorageOperator's docstring. Closes #3270 from sekikn/AIRFLOW-2382 > Fix wrong description for delimiter > --- > > Key: AIRFLOW-2382 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2382 > Project: Apache Airflow > Issue Type: Bug > Components: aws, operators >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > The document for S3ListOperator says: > {code} > :param delimiter: The delimiter by which you want to filter the objects. > For e.g to lists the CSV files from in a directory in S3 you would use > delimiter='.csv'. > {code} > {code} > **Example**: > The following operator would list all the CSV files from the S3 > ``customers/2018/04/`` key in the ``data`` bucket. :: > s3_file = S3ListOperator( > task_id='list_3s_files', > bucket='data', > prefix='customers/2018/04/', > delimiter='.csv', > aws_conn_id='aws_customers_conn' > ) > {code} > but it actually behaves oppositely: > {code} > In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator > In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3').execute(None) > [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe'] > In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3', delimiter='.csv').execute(None) > [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[3]: ['1.txt', '2.jpg', '3.exe'] > {code} > This is because that the 'delimiter' parameter is for representing path > hierarchy (so '/' is used typically), not file extension. Also > S3ToGoogleCloudStorageOperator has the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2382) Fix wrong description for delimiter
[ https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455643#comment-16455643 ] ASF subversion and git services commented on AIRFLOW-2382: -- Commit ae63246a1d580d77c366276300139165e4c984de in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ae63246 ] [AIRFLOW-2382] Fix wrong description for delimiter Fix misleading descriptions for the 'delimiter' parameter in S3ListOperator and S3ToGoogleCloudStorageOperator's docstring. Closes #3270 from sekikn/AIRFLOW-2382 > Fix wrong description for delimiter > --- > > Key: AIRFLOW-2382 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2382 > Project: Apache Airflow > Issue Type: Bug > Components: aws, operators >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > The document for S3ListOperator says: > {code} > :param delimiter: The delimiter by which you want to filter the objects. > For e.g to lists the CSV files from in a directory in S3 you would use > delimiter='.csv'. > {code} > {code} > **Example**: > The following operator would list all the CSV files from the S3 > ``customers/2018/04/`` key in the ``data`` bucket. :: > s3_file = S3ListOperator( > task_id='list_3s_files', > bucket='data', > prefix='customers/2018/04/', > delimiter='.csv', > aws_conn_id='aws_customers_conn' > ) > {code} > but it actually behaves oppositely: > {code} > In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator > In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3').execute(None) > [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe'] > In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3', delimiter='.csv').execute(None) > [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[3]: ['1.txt', '2.jpg', '3.exe'] > {code} > This is because that the 'delimiter' parameter is for representing path > hierarchy (so '/' is used typically), not file extension. Also > S3ToGoogleCloudStorageOperator has the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2382) Fix wrong description for delimiter
[ https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand resolved AIRFLOW-2382. -- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3270 [https://github.com/apache/incubator-airflow/pull/3270] > Fix wrong description for delimiter > --- > > Key: AIRFLOW-2382 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2382 > Project: Apache Airflow > Issue Type: Bug > Components: aws, operators >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > The document for S3ListOperator says: > {code} > :param delimiter: The delimiter by which you want to filter the objects. > For e.g to lists the CSV files from in a directory in S3 you would use > delimiter='.csv'. > {code} > {code} > **Example**: > The following operator would list all the CSV files from the S3 > ``customers/2018/04/`` key in the ``data`` bucket. :: > s3_file = S3ListOperator( > task_id='list_3s_files', > bucket='data', > prefix='customers/2018/04/', > delimiter='.csv', > aws_conn_id='aws_customers_conn' > ) > {code} > but it actually behaves oppositely: > {code} > In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator > In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3').execute(None) > [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe'] > In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3', delimiter='.csv').execute(None) > [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[3]: ['1.txt', '2.jpg', '3.exe'] > {code} > This is because that the 'delimiter' parameter is for representing path > hierarchy (so '/' is used typically), not file extension. Also > S3ToGoogleCloudStorageOperator has the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2382] Fix wrong description for delimiter
Repository: incubator-airflow Updated Branches: refs/heads/master 36193fc74 -> ae63246a1 [AIRFLOW-2382] Fix wrong description for delimiter Fix misleading descriptions for the 'delimiter' parameter in S3ListOperator and S3ToGoogleCloudStorageOperator's docstring. Closes #3270 from sekikn/AIRFLOW-2382 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae63246a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae63246a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae63246a Branch: refs/heads/master Commit: ae63246a1d580d77c366276300139165e4c984de Parents: 36193fc Author: Kengo SekiAuthored: Thu Apr 26 18:45:12 2018 -0700 Committer: r39132 Committed: Thu Apr 26 18:45:12 2018 -0700 -- airflow/contrib/operators/s3_list_operator.py | 12 +--- airflow/contrib/operators/s3_to_gcs_operator.py | 6 ++ 2 files changed, 7 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_list_operator.py -- diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py index 1dcbc60..dbb45fe 100644 --- a/airflow/contrib/operators/s3_list_operator.py +++ b/airflow/contrib/operators/s3_list_operator.py @@ -24,8 +24,7 @@ from airflow.utils.decorators import apply_defaults class S3ListOperator(BaseOperator): """ -List all objects from the bucket with the given string prefix and delimiter -in name. +List all objects from the bucket with the given string prefix in name. This operator returns a python list with the name of objects which can be used by `xcom` in the downstream task. @@ -35,22 +34,21 @@ class S3ListOperator(BaseOperator): :param prefix: Prefix string to filters the objects whose name begin with such prefix :type prefix: string -:param delimiter: The delimiter by which you want to filter the objects. -For e.g to lists the CSV files from in a directory in S3 you would use -delimiter='.csv'. +:param delimiter: the delimiter marks key hierarchy. :type delimiter: string :param aws_conn_id: The connection ID to use when connecting to S3 storage. :type aws_conn_id: string **Example**: -The following operator would list all the CSV files from the S3 +The following operator would list all the files +(excluding subfolders) from the S3 ``customers/2018/04/`` key in the ``data`` bucket. :: s3_file = S3ListOperator( task_id='list_3s_files', bucket='data', prefix='customers/2018/04/', -delimiter='.csv', +delimiter='/', aws_conn_id='aws_customers_conn' ) """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_to_gcs_operator.py -- diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py index d105596..5a2004d 100644 --- a/airflow/contrib/operators/s3_to_gcs_operator.py +++ b/airflow/contrib/operators/s3_to_gcs_operator.py @@ -37,12 +37,10 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator): :param prefix: Prefix string which filters objects whose name begin with such prefix. :type prefix: string -:param delimiter: The delimiter by which you want to filter the objects on. -E.g. to list CSV files from a S3 key you would do the following, -`delimiter='.csv'`. +:param delimiter: the delimiter marks key hierarchy. :type delimiter: string :param aws_conn_id: The source S3 connection -:type aws_conn_id: str +:type aws_conn_id: string :param dest_gcs_conn_id: The destination connection ID to use when connecting to Google Cloud Storage. :type dest_gcs_conn_id: string
[jira] [Updated] (AIRFLOW-2384) Flask 0.12.3+ breaks Airflow webserver
[ https://issues.apache.org/jira/browse/AIRFLOW-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Hamlin updated AIRFLOW-2384: - Description: Flask 0.12.3 and 1.0.0 were released about an hour ago with breaking changes to Airflows webserver {code:java} File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 135, in handle self.handle_request(listener, req, client, addr) File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 176, in handle_request respiter = self.wsgi(environ, resp.start_response) File "/usr/local/lib/python3.6/site-packages/werkzeug/wsgi.py", line 826, in __call__ return app(environ, start_response) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1997, in __call__ return self.wsgi_app(environ, start_response) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1978, in wsgi_app ctx.push() File "/usr/local/lib/python3.6/site-packages/flask/ctx.py", line 332, in push self.session = self.app.open_session(self.request) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 912, in open_session return self.session_interface.open_session(self, request) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 324, in open_session s = self.get_signing_serializer(app) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 321, in get_signing_serializer signer_kwargs=signer_kwargs) File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 519, in __init__ self.is_text_serializer = is_text_serializer(serializer) File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 69, in is_text_serializer return isinstance(serializer.dumps({}), text_type) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 85, in dumps return json.dumps(_tag(value), separators=(',', ':')) File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 177, in dumps _dump_arg_defaults(kwargs) File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 98, in _dump_arg_defaults bp.json_encoder if bp and bp.json_encoder AttributeError: 'Blueprint' object has no attribute 'json_encoder'{code} was: Flask 12.3.0 and 1.0.0 were released about an hour ago with breaking changes to Airflows webserver {code:java} File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 135, in handle self.handle_request(listener, req, client, addr) File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 176, in handle_request respiter = self.wsgi(environ, resp.start_response) File "/usr/local/lib/python3.6/site-packages/werkzeug/wsgi.py", line 826, in __call__ return app(environ, start_response) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1997, in __call__ return self.wsgi_app(environ, start_response) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1978, in wsgi_app ctx.push() File "/usr/local/lib/python3.6/site-packages/flask/ctx.py", line 332, in push self.session = self.app.open_session(self.request) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 912, in open_session return self.session_interface.open_session(self, request) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 324, in open_session s = self.get_signing_serializer(app) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 321, in get_signing_serializer signer_kwargs=signer_kwargs) File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 519, in __init__ self.is_text_serializer = is_text_serializer(serializer) File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 69, in is_text_serializer return isinstance(serializer.dumps({}), text_type) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 85, in dumps return json.dumps(_tag(value), separators=(',', ':')) File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 177, in dumps _dump_arg_defaults(kwargs) File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 98, in _dump_arg_defaults bp.json_encoder if bp and bp.json_encoder AttributeError: 'Blueprint' object has no attribute 'json_encoder'{code} > Flask 0.12.3+ breaks Airflow webserver > -- > > Key: AIRFLOW-2384 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2384 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: 1.10.0 >Reporter: Kyle Hamlin >Priority: Critical > Fix For: 1.10.0 > > > Flask 0.12.3 and 1.0.0 were released about an hour ago with breaking changes > to Airflows webserver > {code:java} > File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line > 135, in handle > self.handle_request(listener, req,
[jira] [Created] (AIRFLOW-2384) Flask 0.12.3+ breaks Airflow webserver
Kyle Hamlin created AIRFLOW-2384: Summary: Flask 0.12.3+ breaks Airflow webserver Key: AIRFLOW-2384 URL: https://issues.apache.org/jira/browse/AIRFLOW-2384 Project: Apache Airflow Issue Type: Bug Components: webserver Affects Versions: 1.10.0 Reporter: Kyle Hamlin Fix For: 1.10.0 Flask 12.3.0 and 1.0.0 were released about an hour ago with breaking changes to Airflows webserver {code:java} File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 135, in handle self.handle_request(listener, req, client, addr) File "/usr/local/lib/python3.6/site-packages/gunicorn/workers/sync.py", line 176, in handle_request respiter = self.wsgi(environ, resp.start_response) File "/usr/local/lib/python3.6/site-packages/werkzeug/wsgi.py", line 826, in __call__ return app(environ, start_response) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1997, in __call__ return self.wsgi_app(environ, start_response) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1978, in wsgi_app ctx.push() File "/usr/local/lib/python3.6/site-packages/flask/ctx.py", line 332, in push self.session = self.app.open_session(self.request) File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 912, in open_session return self.session_interface.open_session(self, request) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 324, in open_session s = self.get_signing_serializer(app) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 321, in get_signing_serializer signer_kwargs=signer_kwargs) File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 519, in __init__ self.is_text_serializer = is_text_serializer(serializer) File "/usr/local/lib/python3.6/site-packages/itsdangerous.py", line 69, in is_text_serializer return isinstance(serializer.dumps({}), text_type) File "/usr/local/lib/python3.6/site-packages/flask/sessions.py", line 85, in dumps return json.dumps(_tag(value), separators=(',', ':')) File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 177, in dumps _dump_arg_defaults(kwargs) File "/usr/local/lib/python3.6/site-packages/flask/json/__init__.py", line 98, in _dump_arg_defaults bp.json_encoder if bp and bp.json_encoder AttributeError: 'Blueprint' object has no attribute 'json_encoder'{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454925#comment-16454925 ] Kevin Yang commented on AIRFLOW-2363: - [~b11c] If the case is that the set_context is not called properly then this will not resolve 2379. The entry point of the task handler's set_context() method should be here: [https://github.com/apache/incubator-airflow/blob/master/airflow/bin/cli.py#L460,] which is called in all `airflow run` command. Not sure why it can be missed. I kinda suspect the reason log not being uploaded is because of the bug getting fixed in this issue but I might need your help to confirm that. > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File
[jira] [Comment Edited] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454914#comment-16454914 ] Berislav Lopac edited comment on AIRFLOW-2363 at 4/26/18 9:39 PM: -- Is there any chance that this might also resolve AIRFLOW-2379? was (Author: b11c): Is there any chance that this might also resolve AIRFLOW-2375? > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log >
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454914#comment-16454914 ] Berislav Lopac commented on AIRFLOW-2363: - Is there any chance that this might also resolve AIRFLOW-2375? > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2380] Add support for environment variables in Spark submit operator.
Repository: incubator-airflow Updated Branches: refs/heads/master b0d0d0a04 -> 36193fc74 [AIRFLOW-2380] Add support for environment variables in Spark submit operator. Closes #3268 from piffall/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/36193fc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/36193fc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/36193fc7 Branch: refs/heads/master Commit: 36193fc7449ca67c807b54ad17a086b35c0c4471 Parents: b0d0d0a Author: Cristòfol TorrensAuthored: Thu Apr 26 14:21:21 2018 -0700 Committer: Arthur Wiedmer Committed: Thu Apr 26 14:21:21 2018 -0700 -- airflow/contrib/hooks/spark_submit_hook.py | 29 +- .../contrib/operators/spark_submit_operator.py | 10 +++- tests/contrib/hooks/test_spark_submit_hook.py | 59 +++- 3 files changed, 91 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/36193fc7/airflow/contrib/hooks/spark_submit_hook.py -- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 71c68c0..0185cab 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -80,6 +80,9 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :type num_executors: int :param application_args: Arguments for the application being submitted :type application_args: list +:param env_vars: Environment variables for spark-submit. It + supports yarn and k8s mode too. +:type env_vars: dict :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ @@ -103,6 +106,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): name='default-name', num_executors=None, application_args=None, + env_vars=None, verbose=False): self._conf = conf self._conn_id = conn_id @@ -123,6 +127,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): self._name = name self._num_executors = num_executors self._application_args = application_args +self._env_vars = env_vars self._verbose = verbose self._submit_sp = None self._yarn_application_id = None @@ -209,6 +214,20 @@ class SparkSubmitHook(BaseHook, LoggingMixin): if self._conf: for key in self._conf: connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))] +if self._env_vars and (self._is_kubernetes or self._is_yarn): +if self._is_yarn: +tmpl = "spark.yarn.appMasterEnv.{}={}" +else: +tmpl = "spark.kubernetes.driverEnv.{}={}" +for key in self._env_vars: +connection_cmd += [ +"--conf", +tmpl.format(key, str(self._env_vars[key]))] +elif self._env_vars and self._connection['deploy_mode'] != "cluster": +self._env = self._env_vars # Do it on Popen of the process +elif self._env_vars and self._connection['deploy_mode'] == "cluster": +raise AirflowException( +"SparkSubmitHook env_vars is not supported in standalone-cluster mode.") if self._is_kubernetes: connection_cmd += ["--conf", "spark.kubernetes.namespace={}".format( self._connection['namespace'])] @@ -294,6 +313,12 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :param kwargs: extra arguments to Popen (see subprocess.Popen) """ spark_submit_cmd = self._build_spark_submit_command(application) + +if hasattr(self, '_env'): +env = os.environ.copy() +env.update(self._env) +kwargs["env"] = env + self._submit_sp = subprocess.Popen(spark_submit_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
[jira] [Resolved] (AIRFLOW-2380) Add support for environment variables in Spark submit operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arthur Wiedmer resolved AIRFLOW-2380. - Resolution: Fixed Fix Version/s: 1.10.0 Issue resolved by pull request #3268 [https://github.com/apache/incubator-airflow/pull/3268] > Add support for environment variables in Spark submit operator > -- > > Key: AIRFLOW-2380 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2380 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib, operators >Reporter: Cristòfol Torrens >Assignee: Cristòfol Torrens >Priority: Minor > Fix For: 1.10.0 > > > Add support for environment variables in Spark submit operator. > For example, to pass the *HADOOP_CONF_DIR* in case of use same Spark cluster > with multiple HDFS. > The idea is to pass as a dict, and resolve it in the case of using > *yarn-*_client/cluster_*,* and *standalone-*_client_ mode. > In *standalone-*_cluster_ mode is not possible to do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor
[ https://issues.apache.org/jira/browse/AIRFLOW-2383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Yang closed AIRFLOW-2383. --- Resolution: Invalid > Escape colon in partition name when poking inside NamedHivePartitionSensor > -- > > Key: AIRFLOW-2383 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2383 > Project: Apache Airflow > Issue Type: Bug >Reporter: Kevin Yang >Assignee: Kevin Yang >Priority: Major > > colon in NamedHivePartitionSensor is not escaping colon in the partition name > causing different behavior than HivePartitionSensor if there's colon in the > partition name. Need to escape it to `%3A` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor
[ https://issues.apache.org/jira/browse/AIRFLOW-2383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454840#comment-16454840 ] Kevin Yang commented on AIRFLOW-2383: - When using partition names, users are supposed to specify escaped values, closing the jira. > Escape colon in partition name when poking inside NamedHivePartitionSensor > -- > > Key: AIRFLOW-2383 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2383 > Project: Apache Airflow > Issue Type: Bug >Reporter: Kevin Yang >Assignee: Kevin Yang >Priority: Major > > colon in NamedHivePartitionSensor is not escaping colon in the partition name > causing different behavior than HivePartitionSensor if there's colon in the > partition name. Need to escape it to `%3A` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454772#comment-16454772 ] James Davidheiser commented on AIRFLOW-2363: I tested again after updating to work around some other bugs and updating to match the latest suggested celery configuration options, and confirmed that the change in [https://github.com/apache/incubator-airflow/pull/3259] works for me. > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", >
[jira] [Assigned] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Davidheiser reassigned AIRFLOW-2363: -- Assignee: (was: James Davidheiser) > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Davidheiser reassigned AIRFLOW-2363: -- Assignee: James Davidheiser (was: Kevin Yang) > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: James Davidheiser >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor
[ https://issues.apache.org/jira/browse/AIRFLOW-2383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Yang reassigned AIRFLOW-2383: --- Assignee: Kevin Yang > Escape colon in partition name when poking inside NamedHivePartitionSensor > -- > > Key: AIRFLOW-2383 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2383 > Project: Apache Airflow > Issue Type: Bug >Reporter: Kevin Yang >Assignee: Kevin Yang >Priority: Major > > colon in NamedHivePartitionSensor is not escaping colon in the partition name > causing different behavior than HivePartitionSensor if there's colon in the > partition name. Need to escape it to `%3A` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2383) Escape colon in partition name when poking inside NamedHivePartitionSensor
Kevin Yang created AIRFLOW-2383: --- Summary: Escape colon in partition name when poking inside NamedHivePartitionSensor Key: AIRFLOW-2383 URL: https://issues.apache.org/jira/browse/AIRFLOW-2383 Project: Apache Airflow Issue Type: Bug Reporter: Kevin Yang colon in NamedHivePartitionSensor is not escaping colon in the partition name causing different behavior than HivePartitionSensor if there's colon in the partition name. Need to escape it to `%3A` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2374) Airflow fails to show logs
[ https://issues.apache.org/jira/browse/AIRFLOW-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454663#comment-16454663 ] Kevin Yang commented on AIRFLOW-2374: - Hi [~b11c], I think the BUG is handled in the following jira. https://issues.apache.org/jira/browse/AIRFLOW-2363 > Airflow fails to show logs > -- > > Key: AIRFLOW-2374 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2374 > Project: Apache Airflow > Issue Type: Bug >Reporter: Berislav Lopac >Assignee: Berislav Lopac >Priority: Blocker > > When viewing a log in the webserver, the page shows a loading gif and the log > never appears. Looking in the Javascript console, the problem appears to be > error 500 when loading the {{get_logs_with_metadata}} endpoint, givving the > following trace: > {code:java} > / ( () ) \___ > /( ( ( ) _)) ) )\ >(( ( )() ) ( ) ) > ((/ ( _( ) ( _) ) ( () ) ) > ( ( ( (_) ((( ) .((_ ) . )_ >( ( )( ( )) ) . ) ( ) > ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) > ( ( ( ) ( ) ( )) ) _)( ) ) ) > ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) > ( ( ( ( (_ ( ) ( _) ) ( ) ) ) > ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) > (( ( )(( _) _) _(_ ( (_ ) >(_((__(_(__(( ( ( | ) ) ) )_))__))_)___) >((__)\\||lll|l||/// \_)) > ( /(/ ( ) ) )\ ) > (( ( ( | | ) ) )\ ) >( /(| / ( )) ) ) )) ) > ( ( _(|)_) ) > ( ||\(|(|)|/|| ) > (|(||(||)) > ( //|/l|||)|\\ \ ) > (/ / // /|//\\ \ \ \ _) > --- > Node: airflow-nods-dev > --- > Traceback (most recent call last): > File > "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line > 113, in _read > remote_log = self.gcs_read(remote_loc) > File > "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line > 131, in gcs_read > return self.hook.download(bkt, blob).decode() > File "/opt/airflow/src/apache-airflow/airflow/contrib/hooks/gcs_hook.py", > line 107, in download > .get_media(bucket=bucket, object=object) \ > File "/usr/local/lib/python3.6/dist-packages/oauth2client/_helpers.py", > line 133, in positional_wrapper > return wrapped(*args, **kwargs) > File "/usr/local/lib/python3.6/dist-packages/googleapiclient/http.py", line > 841, in execute > raise HttpError(resp, content, uri=self.uri) > googleapiclient.errors.HttpError: https://www.googleapis.com/storage/v1/b/bucket-af/o/test-logs%2Fgeneric_transfer_single%2Ftransfer_file%2F2018-04-25T13%3A00%3A51.250983%2B00%3A00%2F1.log?alt=media > returned "Not Found"> > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 33, in > reraise > raise value > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 69, > in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/dist-packages/flask_login.py", line 758, in > decorated_view > return func(*args, **kwargs) > File "/opt/airflow/src/apache-airflow/airflow/www/utils.py", line 269, in > wrapper > return f(*args, **kwargs) > File
[jira] [Assigned] (AIRFLOW-2382) Fix wrong description for delimiter
[ https://issues.apache.org/jira/browse/AIRFLOW-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kengo Seki reassigned AIRFLOW-2382: --- Assignee: Kengo Seki > Fix wrong description for delimiter > --- > > Key: AIRFLOW-2382 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2382 > Project: Apache Airflow > Issue Type: Bug > Components: aws, operators >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > > The document for S3ListOperator says: > {code} > :param delimiter: The delimiter by which you want to filter the objects. > For e.g to lists the CSV files from in a directory in S3 you would use > delimiter='.csv'. > {code} > {code} > **Example**: > The following operator would list all the CSV files from the S3 > ``customers/2018/04/`` key in the ``data`` bucket. :: > s3_file = S3ListOperator( > task_id='list_3s_files', > bucket='data', > prefix='customers/2018/04/', > delimiter='.csv', > aws_conn_id='aws_customers_conn' > ) > {code} > but it actually behaves oppositely: > {code} > In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator > In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3').execute(None) > [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe'] > In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', > aws_conn_id='s3', delimiter='.csv').execute(None) > [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.amazonaws.com > [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3-ap-northeast-1.amazonaws.com > [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS > connection (1): bkt0.s3.ap-northeast-1.amazonaws.com > Out[3]: ['1.txt', '2.jpg', '3.exe'] > {code} > This is because that the 'delimiter' parameter is for representing path > hierarchy (so '/' is used typically), not file extension. Also > S3ToGoogleCloudStorageOperator has the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454591#comment-16454591 ] Kyle Hamlin edited comment on AIRFLOW-2363 at 4/26/18 5:31 PM: --- [~yrqls21] Just testing this looks like its working now – Kyle Hamlin was (Author: hamlinkn): +Kevin Yang (JIRA)Just testing this look like it working now -- Kyle Hamlin > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File >
[jira] [Resolved] (AIRFLOW-2377) Improve Sendgrid sender support
[ https://issues.apache.org/jira/browse/AIRFLOW-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand resolved AIRFLOW-2377. -- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3266 [https://github.com/apache/incubator-airflow/pull/3266] > Improve Sendgrid sender support > --- > > Key: AIRFLOW-2377 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2377 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Marcin Szymanski >Assignee: Marcin Szymanski >Priority: Minor > Fix For: 2.0.0 > > > * Add support for for sender name > * Allow passing sender email and name via kwargs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454591#comment-16454591 ] Kyle Hamlin commented on AIRFLOW-2363: -- +Kevin Yang (JIRA)Just testing this look like it working now -- Kyle Hamlin > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was sent by Atlassian
[jira] [Commented] (AIRFLOW-2377) Improve Sendgrid sender support
[ https://issues.apache.org/jira/browse/AIRFLOW-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454592#comment-16454592 ] ASF subversion and git services commented on AIRFLOW-2377: -- Commit b0d0d0a041bd5c4b19f6b97dc0fa289436743272 in incubator-airflow's branch refs/heads/master from [~ms32035] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=b0d0d0a ] [AIRFLOW-2377] Improve Sendgrid sender support Closes #3266 from ms32035/sendgrid_sender > Improve Sendgrid sender support > --- > > Key: AIRFLOW-2377 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2377 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Marcin Szymanski >Assignee: Marcin Szymanski >Priority: Minor > Fix For: 2.0.0 > > > * Add support for for sender name > * Allow passing sender email and name via kwargs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2377] Improve Sendgrid sender support
Repository: incubator-airflow Updated Branches: refs/heads/master e44688ed0 -> b0d0d0a04 [AIRFLOW-2377] Improve Sendgrid sender support Closes #3266 from ms32035/sendgrid_sender Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0d0d0a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0d0d0a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0d0d0a0 Branch: refs/heads/master Commit: b0d0d0a041bd5c4b19f6b97dc0fa289436743272 Parents: e44688e Author: Marcin SzymanskiAuthored: Thu Apr 26 10:25:32 2018 -0700 Committer: r39132 Committed: Thu Apr 26 10:25:32 2018 -0700 -- airflow/contrib/utils/sendgrid.py| 8 +--- tests/contrib/utils/test_sendgrid.py | 31 +++ 2 files changed, 32 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0d0d0a0/airflow/contrib/utils/sendgrid.py -- diff --git a/airflow/contrib/utils/sendgrid.py b/airflow/contrib/utils/sendgrid.py index 206bb93..9055c97 100644 --- a/airflow/contrib/utils/sendgrid.py +++ b/airflow/contrib/utils/sendgrid.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -51,7 +51,9 @@ def send_email(to, subject, html_content, files=None, SENDGRID_API_KEY={your-sendgrid-api-key}. """ mail = Mail() -mail.from_email = Email(os.environ.get('SENDGRID_MAIL_FROM')) +from_email = kwargs.get('from_email') or os.environ.get('SENDGRID_MAIL_FROM') +from_name = kwargs.get('from_name') or os.environ.get('SENDGRID_MAIL_SENDER') +mail.from_email = Email(from_email, from_name) mail.subject = subject # Add the recipient list of to emails. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0d0d0a0/tests/contrib/utils/test_sendgrid.py -- diff --git a/tests/contrib/utils/test_sendgrid.py b/tests/contrib/utils/test_sendgrid.py index 997bc55..6710076 100644 --- a/tests/contrib/utils/test_sendgrid.py +++ b/tests/contrib/utils/test_sendgrid.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,17 +50,27 @@ class SendEmailSendGridTest(unittest.TestCase): 'subject': 'sendgrid-send-email unit test'} self.personalization_custom_args = {'arg1': 'val1', 'arg2': 'val2'} self.categories = ['cat1', 'cat2'] +# extras self.expected_mail_data_extras = copy.deepcopy(self.expected_mail_data) self.expected_mail_data_extras['personalizations'][0]['custom_args'] = \ self.personalization_custom_args self.expected_mail_data_extras['categories'] = self.categories +self.expected_mail_data_extras['from'] = \ +{'name': 'Foo', 'email': 'f...@bar.com'} +# sender +self.expected_mail_data_sender = copy.deepcopy(self.expected_mail_data) +self.expected_mail_data_sender['from'] = \ +{'name': 'Foo Bar', 'email': 'f...@foo.bar'} # Test the right email is constructed. @mock.patch('os.environ.get') @mock.patch('airflow.contrib.utils.sendgrid._post_sendgrid_mail') def test_send_email_sendgrid_correct_email(self, mock_post, mock_get): -mock_get.return_value = 'f...@bar.com' +def get_return(var): +return {'SENDGRID_MAIL_FROM': 'f...@bar.com'}.get(var) + +mock_get.side_effect = get_return send_email(self.to, self.subject, self.html_content, cc=self.cc, bcc=self.bcc) mock_post.assert_called_with(self.expected_mail_data) @@ -68,8 +78,21 @@ class SendEmailSendGridTest(unittest.TestCase): @mock.patch('os.environ.get') @mock.patch('airflow.contrib.utils.sendgrid._post_sendgrid_mail') def test_send_email_sendgrid_correct_email_extras(self, mock_post, mock_get): -mock_get.return_value = 'f...@bar.com' +def get_return(var): +
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454533#comment-16454533 ] Kyle Hamlin commented on AIRFLOW-2363: -- Yeah trying it now. Was there a new field added to the task_instance (executor_config) table? -- Kyle Hamlin > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was sent
[jira] [Created] (AIRFLOW-2382) Fix wrong description for delimiter
Kengo Seki created AIRFLOW-2382: --- Summary: Fix wrong description for delimiter Key: AIRFLOW-2382 URL: https://issues.apache.org/jira/browse/AIRFLOW-2382 Project: Apache Airflow Issue Type: Bug Components: aws, operators Reporter: Kengo Seki The document for S3ListOperator says: {code} :param delimiter: The delimiter by which you want to filter the objects. For e.g to lists the CSV files from in a directory in S3 you would use delimiter='.csv'. {code} {code} **Example**: The following operator would list all the CSV files from the S3 ``customers/2018/04/`` key in the ``data`` bucket. :: s3_file = S3ListOperator( task_id='list_3s_files', bucket='data', prefix='customers/2018/04/', delimiter='.csv', aws_conn_id='aws_customers_conn' ) {code} but it actually behaves oppositely: {code} In [1]: from airflow.contrib.operators.s3_list_operator import S3ListOperator In [2]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', aws_conn_id='s3').execute(None) [2018-04-26 10:34:27,001] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): bkt0.s3.amazonaws.com [2018-04-26 10:34:27,711] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): bkt0.s3-ap-northeast-1.amazonaws.com [2018-04-26 10:34:27,801] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): bkt0.s3.ap-northeast-1.amazonaws.com Out[2]: ['0.csv', '1.txt', '2.jpg', '3.exe'] In [3]: S3ListOperator(task_id='t', bucket='bkt0', prefix='', aws_conn_id='s3', delimiter='.csv').execute(None) [2018-04-26 10:34:39,722] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): bkt0.s3.amazonaws.com [2018-04-26 10:34:40,483] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): bkt0.s3-ap-northeast-1.amazonaws.com [2018-04-26 10:34:40,569] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): bkt0.s3.ap-northeast-1.amazonaws.com Out[3]: ['1.txt', '2.jpg', '3.exe'] {code} This is because that the 'delimiter' parameter is for representing path hierarchy (so '/' is used typically), not file extension. Also S3ToGoogleCloudStorageOperator has the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454406#comment-16454406 ] Kevin Yang commented on AIRFLOW-2363: - [~hamlinkn] Do you mind try out the change in this PR [https://github.com/apache/incubator-airflow/pull/3259] and see if the issue was resolved? > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not
[jira] [Issue Comment Deleted] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Hamlin updated AIRFLOW-2363: - Comment: was deleted (was: I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and I have been have no problems -- Kyle Hamlin ) > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454306#comment-16454306 ] Kyle Hamlin commented on AIRFLOW-2363: -- I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and I there have been no problems since > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This message was sent
[jira] [Comment Edited] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454306#comment-16454306 ] Kyle Hamlin edited comment on AIRFLOW-2363 at 4/26/18 2:27 PM: --- I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and there have been no problems since was (Author: hamlinkn): I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and I there have been no problems since > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File >
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454305#comment-16454305 ] Kyle Hamlin commented on AIRFLOW-2363: -- I reverted all the way back to 39b7d7d87cabae9de02ba5d64b998317b494bdd9 and I have been have no problems -- Kyle Hamlin > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755, in > decorated_view > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line > 269, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 74, in wrapper > return func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line > 770, in get_logs_with_metadata > logs, metadatas = handler.read(ti, try_number, metadata=metadata) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", > line 165, in read > logs[i] += log > TypeError: must be str, not tuple -- This
[jira] [Created] (AIRFLOW-2381) Fix the Flaky ApiPasswordTests
Fokko Driesprong created AIRFLOW-2381: - Summary: Fix the Flaky ApiPasswordTests Key: AIRFLOW-2381 URL: https://issues.apache.org/jira/browse/AIRFLOW-2381 Project: Apache Airflow Issue Type: Improvement Reporter: Fokko Driesprong Currently the ApiPasswordTests fail because the dag is not available in the database. I believe this is an issue with the different tests running parallel. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2380) Add support for environment variables in Spark submit operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2380 started by Cristòfol Torrens. -- > Add support for environment variables in Spark submit operator > -- > > Key: AIRFLOW-2380 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2380 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib, operators >Reporter: Cristòfol Torrens >Assignee: Cristòfol Torrens >Priority: Minor > > Add support for environment variables in Spark submit operator. > For example, to pass the *HADOOP_CONF_DIR* in case of use same Spark cluster > with multiple HDFS. > The idea is to pass as a dict, and resolve it in the case of using > *yarn-*_client/cluster_*,* and *standalone-*_client_ mode. > In *standalone-*_cluster_ mode is not possible to do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2380) Add support for environment variables in Spark submit operator
Cristòfol Torrens created AIRFLOW-2380: -- Summary: Add support for environment variables in Spark submit operator Key: AIRFLOW-2380 URL: https://issues.apache.org/jira/browse/AIRFLOW-2380 Project: Apache Airflow Issue Type: Improvement Components: contrib, operators Reporter: Cristòfol Torrens Assignee: Cristòfol Torrens Add support for environment variables in Spark submit operator. For example, to pass the *HADOOP_CONF_DIR* in case of use same Spark cluster with multiple HDFS. The idea is to pass as a dict, and resolve it in the case of using *yarn-*_client/cluster_*,* and *standalone-*_client_ mode. In *standalone-*_cluster_ mode is not possible to do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2222) GoogleCloudStorageHook.copy fails for large files between locations
[ https://issues.apache.org/jira/browse/AIRFLOW-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW- started by Berislav Lopac. --- > GoogleCloudStorageHook.copy fails for large files between locations > --- > > Key: AIRFLOW- > URL: https://issues.apache.org/jira/browse/AIRFLOW- > Project: Apache Airflow > Issue Type: Bug >Reporter: Berislav Lopac >Assignee: Berislav Lopac >Priority: Major > Fix For: 2.0.0 > > > When copying large files (confirmed for around 3GB) between buckets in > different projects, the operation fails and the Google API returns error > [413—Payload Too > Large|https://cloud.google.com/storage/docs/json_api/v1/status-codes#413_Payload_Too_Large]. > The documentation for the error says: > {quote}The Cloud Storage JSON API supports up to 5 TB objects. > This error may, alternatively, arise if copying objects between locations > and/or storage classes can not complete within 30 seconds. In this case, use > the > [Rewrite|https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite] > method instead.{quote} > The reason seems to be that the {{GoogleCloudStorageHook.copy}} is using the > API {{copy}} method. > h3. Proposed Solution > There are two potential solutions: > # Implement {{GoogleCloudStorageHook.rewrite}} method which can be called > from operators and other objects to ensure successful execution. This method > is more flexible but requires changes both in the {{GoogleCloudStorageHook}} > class and any other classes that use it for copying files to ensure that they > explicitly call {{rewrite}} when needed. > # Modify {{GoogleCloudStorageHook.copy}} to determine when to use {{rewrite}} > instead of {{copy}} underneath. This requires updating only the > {{GoogleCloudStorageHook}} class, but the logic might not cover all the edge > cases and could be difficult to implement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2374) Airflow fails to show logs
[ https://issues.apache.org/jira/browse/AIRFLOW-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2374 started by Berislav Lopac. --- > Airflow fails to show logs > -- > > Key: AIRFLOW-2374 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2374 > Project: Apache Airflow > Issue Type: Bug >Reporter: Berislav Lopac >Assignee: Berislav Lopac >Priority: Blocker > > When viewing a log in the webserver, the page shows a loading gif and the log > never appears. Looking in the Javascript console, the problem appears to be > error 500 when loading the {{get_logs_with_metadata}} endpoint, givving the > following trace: > {code:java} > / ( () ) \___ > /( ( ( ) _)) ) )\ >(( ( )() ) ( ) ) > ((/ ( _( ) ( _) ) ( () ) ) > ( ( ( (_) ((( ) .((_ ) . )_ >( ( )( ( )) ) . ) ( ) > ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) > ( ( ( ) ( ) ( )) ) _)( ) ) ) > ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) > ( ( ( ( (_ ( ) ( _) ) ( ) ) ) > ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) > (( ( )(( _) _) _(_ ( (_ ) >(_((__(_(__(( ( ( | ) ) ) )_))__))_)___) >((__)\\||lll|l||/// \_)) > ( /(/ ( ) ) )\ ) > (( ( ( | | ) ) )\ ) >( /(| / ( )) ) ) )) ) > ( ( _(|)_) ) > ( ||\(|(|)|/|| ) > (|(||(||)) > ( //|/l|||)|\\ \ ) > (/ / // /|//\\ \ \ \ _) > --- > Node: airflow-nods-dev > --- > Traceback (most recent call last): > File > "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line > 113, in _read > remote_log = self.gcs_read(remote_loc) > File > "/opt/airflow/src/apache-airflow/airflow/utils/log/gcs_task_handler.py", line > 131, in gcs_read > return self.hook.download(bkt, blob).decode() > File "/opt/airflow/src/apache-airflow/airflow/contrib/hooks/gcs_hook.py", > line 107, in download > .get_media(bucket=bucket, object=object) \ > File "/usr/local/lib/python3.6/dist-packages/oauth2client/_helpers.py", > line 133, in positional_wrapper > return wrapped(*args, **kwargs) > File "/usr/local/lib/python3.6/dist-packages/googleapiclient/http.py", line > 841, in execute > raise HttpError(resp, content, uri=self.uri) > googleapiclient.errors.HttpError: https://www.googleapis.com/storage/v1/b/bucket-af/o/test-logs%2Fgeneric_transfer_single%2Ftransfer_file%2F2018-04-25T13%3A00%3A51.250983%2B00%3A00%2F1.log?alt=media > returned "Not Found"> > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 33, in > reraise > raise value > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 69, > in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python3.6/dist-packages/flask_login.py", line 758, in > decorated_view > return func(*args, **kwargs) > File "/opt/airflow/src/apache-airflow/airflow/www/utils.py", line 269, in > wrapper > return f(*args, **kwargs) > File "/opt/airflow/src/apache-airflow/airflow/utils/db.py", line 74, in > wrapper > return func(*args, **kwargs) > File
[jira] [Work started] (AIRFLOW-2361) Allow GoogleCloudStorageToGoogleCloudStorageOperator to store list of copied files to XCom
[ https://issues.apache.org/jira/browse/AIRFLOW-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2361 started by Berislav Lopac. --- > Allow GoogleCloudStorageToGoogleCloudStorageOperator to store list of copied > files to XCom > -- > > Key: AIRFLOW-2361 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2361 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Berislav Lopac >Assignee: Berislav Lopac >Priority: Minor > > When {{GoogleCloudStorageToGoogleCloudStorageOperator}} is used with a > wildcard, it can copy more than one file. It would be useful if there would > exist a mechanism to store the list of copied files as XCom so it can be used > by other tasks downstream. > Proposed solution: Add a {{xcom_push}} flag argument to the constructor; if > {{True}}, the {{execute}} method returns a list of two-tuples representing > each source/destination file pair: > {code:java} > [ > ("original/file/path.ext", "target/prefix/original/file/path.ext"), > ... > ]{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2379) Logging: GCSTaskHandler does not upload the logs
Berislav Lopac created AIRFLOW-2379: --- Summary: Logging: GCSTaskHandler does not upload the logs Key: AIRFLOW-2379 URL: https://issues.apache.org/jira/browse/AIRFLOW-2379 Project: Apache Airflow Issue Type: Bug Components: logging Reporter: Berislav Lopac In the current master, when using {{GCSTaskHandler}} the logs are not uploaded to the GCS bucket. I've narrowed it down to the handler not being able to find the local log file in the {{.close}} method, which in turn happens because the method {{.set_context}} is not called in some cases, specifically at the end of the run when the log is supposed to be uploaded. I'll keep exploring the issue when I have the time, but perhaps someone with better understanding of the logging mechanism might be able to provide some insight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2363) S3 remote logging appending tuple instead of str
[ https://issues.apache.org/jira/browse/AIRFLOW-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453663#comment-16453663 ] Kevin Yang commented on AIRFLOW-2363: - [~jdavidh], it was a tricky one to debug, I actually think the NoneType error exist even before 5cb530b455be54e6b58eae19c8c10ef8f5cf955d was merged (at least in my naive setup with S3). That error blocks one attempt of uploading (there's actually multiple attempts, whenever the s3 task handler was closed) and the one that's not blocked got removed by 5cb530b455be54e6b58eae19c8c10ef8f5cf955d and I made a fix to it in the PR. I'm going to assume the root cause is that the uploading called in atexit() was killed when the subprocess ended and the upload cannot finish (according to my debugging logs). But I believe there's more juice in this task handler closing issue and need some more work to be perfect. I'm gonna stop here due to priority change but I would be very curious to know all the details if you decided to dig to the end of it. > S3 remote logging appending tuple instead of str > > > Key: AIRFLOW-2363 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2363 > Project: Apache Airflow > Issue Type: Bug > Components: logging >Reporter: Kyle Hamlin >Assignee: Kevin Yang >Priority: Major > Fix For: 1.10.0 > > > A recent merge into master that added support for Elasticsearch logging seems > to have broken S3 logging by returning a tuple instead of a string. > [https://github.com/apache/incubator-airflow/commit/ec38ba9594395de04ec932481212a86fbe9ae107#diff-0442332ecbe42ebbf426911c68d8cd4aR128] > > following errors thrown: > > *Session NoneType error* > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py", > line 171, in s3_write > encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 274, in load_string > encrypt=encrypt) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 313, in load_bytes > client = self.get_conn() > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py", > line 34, in get_conn > return self.get_client_type('s3') > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 151, in get_client_type > session, endpoint_url = self._get_credentials(region_name) > File > "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py", > line 97, in _get_credentials > connection_object = self.get_connection(self.aws_conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 82, in get_connection > conn = random.choice(cls.get_connections(conn_id)) > File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", > line 77, in get_connections > conns = cls._get_connections_from_db(conn_id) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 72, in wrapper > with create_session() as session: > File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__ > return next(self.gen) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line > 41, in create_session > session = settings.Session() > TypeError: 'NoneType' object is not callable > > *TypeError must be str not tuple* > [2018-04-16 18:37:28,200] ERROR in app: Exception on > /admin/airflow/get_logs_with_metadata [GET] > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, > in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 69, in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File
incubator-airflow git commit: [AIRFLOW-2331] Support init action timeout on dataproc cluster create
Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 84cfbf6a1 -> cb264e940 [AIRFLOW-2331] Support init action timeout on dataproc cluster create Closes #3235 from piffall/master (cherry picked from commit e44688ed090a438a20751e11cc96c72554630f1d) Signed-off-by: Fokko DriesprongProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb264e94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb264e94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb264e94 Branch: refs/heads/v1-10-test Commit: cb264e940e90a865726ea788e5fd3c2f98f4817e Parents: 84cfbf6 Author: Cristòfol Torrens Authored: Thu Apr 26 09:59:51 2018 +0200 Committer: Fokko Driesprong Committed: Thu Apr 26 10:00:17 2018 +0200 -- airflow/contrib/operators/dataproc_operator.py | 29 ++-- .../contrib/operators/test_dataproc_operator.py | 12 ++-- 2 files changed, 36 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb264e94/airflow/contrib/operators/dataproc_operator.py -- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 728fae9..56ebb91 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,8 +20,10 @@ import ntpath import os +import re import time import uuid +from datetime import timedelta from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook @@ -57,6 +59,9 @@ class DataprocClusterCreateOperator(BaseOperator): :param init_actions_uris: List of GCS uri's containing dataproc initialization scripts :type init_actions_uris: list[string] +:param init_action_timeout: Amount of time executable scripts in +init_actions_uris has to complete +:type init_action_timeout: string :param metadata: dict of key-value google compute engine metadata entries to add to all instances :type metadata: dict @@ -115,6 +120,7 @@ class DataprocClusterCreateOperator(BaseOperator): tags=None, storage_bucket=None, init_actions_uris=None, + init_action_timeout="10m", metadata=None, image_version=None, properties=None, @@ -141,6 +147,7 @@ class DataprocClusterCreateOperator(BaseOperator): self.num_preemptible_workers = num_preemptible_workers self.storage_bucket = storage_bucket self.init_actions_uris = init_actions_uris +self.init_action_timeout = init_action_timeout self.metadata = metadata self.image_version = image_version self.properties = properties @@ -206,6 +213,19 @@ class DataprocClusterCreateOperator(BaseOperator): return time.sleep(15) +def _get_init_action_timeout(self): +match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout) +if match: +if match.group(2) == "s": +return self.init_action_timeout +elif match.group(2) == "m": +val = float(match.group(1)) +return "{}s".format(timedelta(minutes=val).seconds) + +raise AirflowException( +"DataprocClusterCreateOperator init_action_timeout" +" should be expressed in minutes or seconds. i.e. 10m, 30s") + def _build_cluster_data(self): zone_uri = \ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( @@ -276,7 +296,10 @@ class DataprocClusterCreateOperator(BaseOperator): cluster_data['config']['softwareConfig']['properties'] = self.properties if self.init_actions_uris: init_actions_dict = [ -{'executableFile': uri} for uri in self.init_actions_uris +{ +'executableFile': uri, +'executionTimeout': self._get_init_action_timeout() +} for uri in self.init_actions_uris ]
[jira] [Resolved] (AIRFLOW-2331) Add support for initialization action timeout on dataproc cluster creation.
[ https://issues.apache.org/jira/browse/AIRFLOW-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2331. --- Resolution: Fixed Fix Version/s: (was: Airflow 2.0) 2.0.0 Issue resolved by pull request #3235 [https://github.com/apache/incubator-airflow/pull/3235] > Add support for initialization action timeout on dataproc cluster creation. > --- > > Key: AIRFLOW-2331 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2331 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Cristòfol Torrens >Assignee: Cristòfol Torrens >Priority: Minor > Labels: contrib, dataproc, operator > Fix For: 2.0.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > Add support to customize timeout for initialization scripts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2331] Support init action timeout on dataproc cluster create
Repository: incubator-airflow Updated Branches: refs/heads/master dde066d00 -> e44688ed0 [AIRFLOW-2331] Support init action timeout on dataproc cluster create Closes #3235 from piffall/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e44688ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e44688ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e44688ed Branch: refs/heads/master Commit: e44688ed090a438a20751e11cc96c72554630f1d Parents: dde066d Author: Cristòfol TorrensAuthored: Thu Apr 26 09:59:51 2018 +0200 Committer: Fokko Driesprong Committed: Thu Apr 26 09:59:51 2018 +0200 -- airflow/contrib/operators/dataproc_operator.py | 29 ++-- .../contrib/operators/test_dataproc_operator.py | 12 ++-- 2 files changed, 36 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e44688ed/airflow/contrib/operators/dataproc_operator.py -- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 728fae9..56ebb91 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,8 +20,10 @@ import ntpath import os +import re import time import uuid +from datetime import timedelta from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook @@ -57,6 +59,9 @@ class DataprocClusterCreateOperator(BaseOperator): :param init_actions_uris: List of GCS uri's containing dataproc initialization scripts :type init_actions_uris: list[string] +:param init_action_timeout: Amount of time executable scripts in +init_actions_uris has to complete +:type init_action_timeout: string :param metadata: dict of key-value google compute engine metadata entries to add to all instances :type metadata: dict @@ -115,6 +120,7 @@ class DataprocClusterCreateOperator(BaseOperator): tags=None, storage_bucket=None, init_actions_uris=None, + init_action_timeout="10m", metadata=None, image_version=None, properties=None, @@ -141,6 +147,7 @@ class DataprocClusterCreateOperator(BaseOperator): self.num_preemptible_workers = num_preemptible_workers self.storage_bucket = storage_bucket self.init_actions_uris = init_actions_uris +self.init_action_timeout = init_action_timeout self.metadata = metadata self.image_version = image_version self.properties = properties @@ -206,6 +213,19 @@ class DataprocClusterCreateOperator(BaseOperator): return time.sleep(15) +def _get_init_action_timeout(self): +match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout) +if match: +if match.group(2) == "s": +return self.init_action_timeout +elif match.group(2) == "m": +val = float(match.group(1)) +return "{}s".format(timedelta(minutes=val).seconds) + +raise AirflowException( +"DataprocClusterCreateOperator init_action_timeout" +" should be expressed in minutes or seconds. i.e. 10m, 30s") + def _build_cluster_data(self): zone_uri = \ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( @@ -276,7 +296,10 @@ class DataprocClusterCreateOperator(BaseOperator): cluster_data['config']['softwareConfig']['properties'] = self.properties if self.init_actions_uris: init_actions_dict = [ -{'executableFile': uri} for uri in self.init_actions_uris +{ +'executableFile': uri, +'executionTimeout': self._get_init_action_timeout() +} for uri in self.init_actions_uris ] cluster_data['config']['initializationActions'] = init_actions_dict if self.service_account:
[jira] [Commented] (AIRFLOW-2331) Add support for initialization action timeout on dataproc cluster creation.
[ https://issues.apache.org/jira/browse/AIRFLOW-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453630#comment-16453630 ] Cristòfol Torrens commented on AIRFLOW-2331: Pull request approved: https://github.com/apache/incubator-airflow/pull/3235 > Add support for initialization action timeout on dataproc cluster creation. > --- > > Key: AIRFLOW-2331 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2331 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Cristòfol Torrens >Assignee: Cristòfol Torrens >Priority: Minor > Labels: contrib, dataproc, operator > Fix For: Airflow 2.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > Add support to customize timeout for initialization scripts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)