[jira] [Created] (AIRFLOW-2528) Airflow cli does not allow disabled stdin
Brent Johnson created AIRFLOW-2528: -- Summary: Airflow cli does not allow disabled stdin Key: AIRFLOW-2528 URL: https://issues.apache.org/jira/browse/AIRFLOW-2528 Project: Apache Airflow Issue Type: Bug Components: cli Affects Versions: 1.9.0 Reporter: Brent Johnson So basically, I am trying to automated regression testing by executing an airflow dag. I Using the cli I can successfully run the following command: {code:java} ./airflow run regression-testing regression-ingestion 2018-05-25{code} Unfortunately, I want to be triggering this against our staging instance in production on AWS. I figured an easy way to do this would be to use [AWS System Manager|https://docs.aws.amazon.com/systems-manager/latest/userguide/run-command.html] unfortunately any airflow command I call returns: {code:java} the input device is not a TTY {code} I was able to recreate this running the following command locally by piping stdin to anywhere: {code:java} ./airflow run regression-testing regression-ingestion 2018-05-25 0>/dev/null{code} This is of course an extreme example but it feels like a bug for a cli to require stdin to be open. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2527) Be able to clear Dag Runs + Tasks from command line
Tao Feng created AIRFLOW-2527: - Summary: Be able to clear Dag Runs + Tasks from command line Key: AIRFLOW-2527 URL: https://issues.apache.org/jira/browse/AIRFLOW-2527 Project: Apache Airflow Issue Type: Bug Reporter: Tao Feng Assignee: Tao Feng Make sure airflow clear could clear both dag runs and tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2526) params can be overridden through CLI
Chao-Han Tsai created AIRFLOW-2526: -- Summary: params can be overridden through CLI Key: AIRFLOW-2526 URL: https://issues.apache.org/jira/browse/AIRFLOW-2526 Project: Apache Airflow Issue Type: New Feature Reporter: Chao-Han Tsai Assignee: Chao-Han Tsai The idea is that the values in macros can be overridden through CLI. Given a dag like this: {code:java} dag = DAG('hello_world') templated_command = """ echo "text = {{ params.text }}" """ bash_operator = BashOperator( task_id='bash_task', bash_command=templated_command, dag=dag, params={"text": "this text should be overridden"} ) {code} running {code:java} airflow trigger_dag hello_world -c "{"text": "overridden successfully"}" {code} should print {code} overridden successfully {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-695) Retries do not execute because dagrun is in FAILED state
[ https://issues.apache.org/jira/browse/AIRFLOW-695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491080#comment-16491080 ] Tylar Murray commented on AIRFLOW-695: -- I believe I am seeing this issue using airflow 1.9.0. What is the fix version for this bug? Given that this was committed over a year ago I wouldn't expect to see this in 1.9.0 > Retries do not execute because dagrun is in FAILED state > > > Key: AIRFLOW-695 > URL: https://issues.apache.org/jira/browse/AIRFLOW-695 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Reporter: Harvey Xia >Priority: Blocker > Labels: executor, scheduler > > Currently on the latest master commit > (15ff540ecd5e60e7ce080177ea3ea227582a4672), running on the LocalExecutor, > retries on tasks do not execute because the state of the corresponding dagrun > changes to FAILED. The task instance then gets blocked because "Task > instance's dagrun was not in the 'running' state but in the state 'failed'," > the error message produced by the following lines: > https://github.com/apache/incubator-airflow/blob/master/airflow/ti_deps/deps/dagrun_exists_dep.py#L48-L50 > This error can be reproduced with the following simple DAG: > {code:title=DAG.py|borderStyle=solid} > dag = models.DAG(dag_id='test_retry_handling') > task = BashOperator( > task_id='test_retry_handling_op', > bash_command='exit 1', > retries=1, > retry_delay=datetime.timedelta(minutes=1), > dag=dag, > owner='airflow', > start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2525) Fix PostgresHook.copy_expert to work with "COPY FROM"
[ https://issues.apache.org/jira/browse/AIRFLOW-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joy Gao resolved AIRFLOW-2525. -- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3421 [https://github.com/apache/incubator-airflow/pull/3421] > Fix PostgresHook.copy_expert to work with "COPY FROM" > - > > Key: AIRFLOW-2525 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2525 > Project: Apache Airflow > Issue Type: Bug > Components: hooks >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > psycopg2's {{copy_expert}} [works with both "COPY FROM" and "COPY > TO"|http://initd.org/psycopg/docs/cursor.html#cursor.copy_expert]. > But {{PostgresHook.copy_expert}} fails with "COPY FROM" as follows: > {code} > In [1]: from airflow.hooks.postgres_hook import PostgresHook > In [2]: hook = PostgresHook() > In [3]: hook.copy_expert("COPY t FROM STDIN", "/tmp/t") > [2018-05-24 23:37:54,767] {base_hook.py:83} INFO - Using connection to: > localhost > --- > QueryCanceledErrorTraceback (most recent call last) > in () > > 1 hook.copy_expert("COPY t FROM STDIN", "/tmp/t") > ~/dev/incubator-airflow/airflow/hooks/postgres_hook.py in copy_expert(self, > sql, filename, open) > 68 with closing(self.get_conn()) as conn: > 69 with closing(conn.cursor()) as cur: > ---> 70 cur.copy_expert(sql, f) > 71 > 72 @staticmethod > QueryCanceledError: COPY from stdin failed: error in .read() call: > UnsupportedOperation not readable > CONTEXT: COPY t, line 1 > {code} > This is because the file used in this method is opened with 'w' mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2525) Fix PostgresHook.copy_expert to work with "COPY FROM"
[ https://issues.apache.org/jira/browse/AIRFLOW-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491033#comment-16491033 ] ASF subversion and git services commented on AIRFLOW-2525: -- Commit dabf1b962dcd4323a6ea723076afcd2a20fcb354 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=dabf1b9 ] [AIRFLOW-2525] Fix PostgresHook.copy_expert to work with "COPY FROM" For now PostgresHook.copy_expert supports "COPY TO" but not "COPY FROM", because it opens a file with write mode and doesn't commit operations. This PR fixes it by opening a file with read and write mode and committing operations at last. > Fix PostgresHook.copy_expert to work with "COPY FROM" > - > > Key: AIRFLOW-2525 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2525 > Project: Apache Airflow > Issue Type: Bug > Components: hooks >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > psycopg2's {{copy_expert}} [works with both "COPY FROM" and "COPY > TO"|http://initd.org/psycopg/docs/cursor.html#cursor.copy_expert]. > But {{PostgresHook.copy_expert}} fails with "COPY FROM" as follows: > {code} > In [1]: from airflow.hooks.postgres_hook import PostgresHook > In [2]: hook = PostgresHook() > In [3]: hook.copy_expert("COPY t FROM STDIN", "/tmp/t") > [2018-05-24 23:37:54,767] {base_hook.py:83} INFO - Using connection to: > localhost > --- > QueryCanceledErrorTraceback (most recent call last) > in () > > 1 hook.copy_expert("COPY t FROM STDIN", "/tmp/t") > ~/dev/incubator-airflow/airflow/hooks/postgres_hook.py in copy_expert(self, > sql, filename, open) > 68 with closing(self.get_conn()) as conn: > 69 with closing(conn.cursor()) as cur: > ---> 70 cur.copy_expert(sql, f) > 71 > 72 @staticmethod > QueryCanceledError: COPY from stdin failed: error in .read() call: > UnsupportedOperation not readable > CONTEXT: COPY t, line 1 > {code} > This is because the file used in this method is opened with 'w' mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2525) Fix PostgresHook.copy_expert to work with "COPY FROM"
[ https://issues.apache.org/jira/browse/AIRFLOW-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491034#comment-16491034 ] ASF subversion and git services commented on AIRFLOW-2525: -- Commit 432ac718b14e2a3211386e84aece32132c2f8fce in incubator-airflow's branch refs/heads/master from Joy Gao [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=432ac71 ] Merge pull request #3421 from sekikn/AIRFLOW-2525 > Fix PostgresHook.copy_expert to work with "COPY FROM" > - > > Key: AIRFLOW-2525 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2525 > Project: Apache Airflow > Issue Type: Bug > Components: hooks >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > psycopg2's {{copy_expert}} [works with both "COPY FROM" and "COPY > TO"|http://initd.org/psycopg/docs/cursor.html#cursor.copy_expert]. > But {{PostgresHook.copy_expert}} fails with "COPY FROM" as follows: > {code} > In [1]: from airflow.hooks.postgres_hook import PostgresHook > In [2]: hook = PostgresHook() > In [3]: hook.copy_expert("COPY t FROM STDIN", "/tmp/t") > [2018-05-24 23:37:54,767] {base_hook.py:83} INFO - Using connection to: > localhost > --- > QueryCanceledErrorTraceback (most recent call last) > in () > > 1 hook.copy_expert("COPY t FROM STDIN", "/tmp/t") > ~/dev/incubator-airflow/airflow/hooks/postgres_hook.py in copy_expert(self, > sql, filename, open) > 68 with closing(self.get_conn()) as conn: > 69 with closing(conn.cursor()) as cur: > ---> 70 cur.copy_expert(sql, f) > 71 > 72 @staticmethod > QueryCanceledError: COPY from stdin failed: error in .read() call: > UnsupportedOperation not readable > CONTEXT: COPY t, line 1 > {code} > This is because the file used in this method is opened with 'w' mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[1/2] incubator-airflow git commit: [AIRFLOW-2525] Fix PostgresHook.copy_expert to work with "COPY FROM"
Repository: incubator-airflow Updated Branches: refs/heads/master ba84b6f4a -> 432ac718b [AIRFLOW-2525] Fix PostgresHook.copy_expert to work with "COPY FROM" For now PostgresHook.copy_expert supports "COPY TO" but not "COPY FROM", because it opens a file with write mode and doesn't commit operations. This PR fixes it by opening a file with read and write mode and committing operations at last. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dabf1b96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dabf1b96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dabf1b96 Branch: refs/heads/master Commit: dabf1b962dcd4323a6ea723076afcd2a20fcb354 Parents: e4e7b55 Author: Kengo Seki Authored: Fri May 25 00:04:19 2018 -0400 Committer: Kengo Seki Committed: Fri May 25 10:52:14 2018 -0400 -- airflow/hooks/postgres_hook.py| 9 + tests/hooks/test_postgres_hook.py | 11 +-- 2 files changed, 10 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dabf1b96/airflow/hooks/postgres_hook.py -- diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py index 7e89d93..bbf125b 100644 --- a/airflow/hooks/postgres_hook.py +++ b/airflow/hooks/postgres_hook.py @@ -64,10 +64,11 @@ class PostgresHook(DbApiHook): Executes SQL using psycopg2 copy_expert method Necessary to execute COPY command without access to a superuser """ -f = open(filename, 'w') -with closing(self.get_conn()) as conn: -with closing(conn.cursor()) as cur: -cur.copy_expert(sql, f) +with open(filename, 'w+') as f: +with closing(self.get_conn()) as conn: +with closing(conn.cursor()) as cur: +cur.copy_expert(sql, f) +conn.commit() @staticmethod def _serialize_cell(cell, conn): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dabf1b96/tests/hooks/test_postgres_hook.py -- diff --git a/tests/hooks/test_postgres_hook.py b/tests/hooks/test_postgres_hook.py index a740264..f636b5a 100644 --- a/tests/hooks/test_postgres_hook.py +++ b/tests/hooks/test_postgres_hook.py @@ -43,17 +43,16 @@ class TestPostgresHook(unittest.TestCase): def test_copy_expert(self): m = mock.mock_open(read_data='{"some": "json"}') -with mock.patch('airflow.hooks.postgres_hook.open', m, create=True) as m: +with mock.patch('airflow.hooks.postgres_hook.open', m): statement = "SQL" filename = "filename" self.cur.fetchall.return_value = None -f = m(filename, 'w') -def test_open(filename, mode): -return f -self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=test_open)) +self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=m)) self.conn.close.assert_called_once() self.cur.close.assert_called_once() -self.cur.copy_expert.assert_called_once_with(statement, f) +self.conn.commit.assert_called_once() +self.cur.copy_expert.assert_called_once_with(statement, m.return_value) +m.assert_called_once_with(filename, "w+")
[2/2] incubator-airflow git commit: Merge pull request #3421 from sekikn/AIRFLOW-2525
Merge pull request #3421 from sekikn/AIRFLOW-2525 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/432ac718 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/432ac718 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/432ac718 Branch: refs/heads/master Commit: 432ac718b14e2a3211386e84aece32132c2f8fce Parents: ba84b6f dabf1b9 Author: Joy Gao Authored: Fri May 25 10:18:13 2018 -0700 Committer: Joy Gao Committed: Fri May 25 10:18:13 2018 -0700 -- airflow/hooks/postgres_hook.py| 9 + tests/hooks/test_postgres_hook.py | 11 +-- 2 files changed, 10 insertions(+), 10 deletions(-) --
[jira] [Resolved] (AIRFLOW-1730) The value of XCom that queried from DB is not unpickled.
[ https://issues.apache.org/jira/browse/AIRFLOW-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor resolved AIRFLOW-1730. Resolution: Fixed Fix Version/s: (was: 1.10) 1.10.0 Issue resolved by pull request #2701 [https://github.com/apache/incubator-airflow/pull/2701] > The value of XCom that queried from DB is not unpickled. > > > Key: AIRFLOW-1730 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1730 > Project: Apache Airflow > Issue Type: Bug > Components: models, xcom >Affects Versions: 1.9.0 >Reporter: Shintaro Murakami >Priority: Major > Fix For: 1.10.0 > > Attachments: xcoms_by_example_xcom.png > > > If enable_xcom_pickling is True, the value of XCom that queried from DB is > not unpickled. > The list of XComs not rendered correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1730) The value of XCom that queried from DB is not unpickled.
[ https://issues.apache.org/jira/browse/AIRFLOW-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490446#comment-16490446 ] ASF subversion and git services commented on AIRFLOW-1730: -- Commit 5a3618701d838a974f532ffbd7658f327a801661 in incubator-airflow's branch refs/heads/v1-10-test from [~mrkm4ntr] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5a36187 ] [AIRFLOW-1730] Unpickle value of XCom queried from DB Cherry picked from #2701 > The value of XCom that queried from DB is not unpickled. > > > Key: AIRFLOW-1730 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1730 > Project: Apache Airflow > Issue Type: Bug > Components: models, xcom >Affects Versions: 1.9.0 >Reporter: Shintaro Murakami >Priority: Major > Fix For: 1.10 > > Attachments: xcoms_by_example_xcom.png > > > If enable_xcom_pickling is True, the value of XCom that queried from DB is > not unpickled. > The list of XComs not rendered correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-1730] Unpickle value of XCom queried from DB
Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 2a8358353 -> 5a3618701 [AIRFLOW-1730] Unpickle value of XCom queried from DB Cherry picked from #2701 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5a361870 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5a361870 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5a361870 Branch: refs/heads/v1-10-test Commit: 5a3618701d838a974f532ffbd7658f327a801661 Parents: 2a83583 Author: Shintaro Murakami Authored: Tue Oct 17 18:40:19 2017 +0900 Committer: Ash Berlin-Taylor Committed: Fri May 25 10:10:04 2018 +0100 -- airflow/models.py | 80 +- tests/models.py | 59 - 2 files changed, 72 insertions(+), 67 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a361870/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 1a66eb1..3077e6d 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4349,6 +4349,24 @@ class XCom(Base, LoggingMixin): Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False), ) +""" +TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. +""" +@reconstructor +def init_on_load(self): +enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +self.value = pickle.loads(self.value) +else: +try: +self.value = json.loads(self.value.decode('UTF-8')) +except (UnicodeEncodeError, ValueError): +# For backward-compatibility. +# Preventing errors in webserver +# due to XComs mixed with pickled and unpickled. +self.value = pickle.loads(self.value) + def __repr__(self): return ''.format( key=self.key, @@ -4364,23 +4382,16 @@ class XCom(Base, LoggingMixin): execution_date, task_id, dag_id, -enable_pickling=None, session=None): """ Store an XCom value. -TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be -removed in Airflow 2.0. :param enable_pickling: If pickling is not enabled, the -XCOM value will be parsed as JSON instead. - +TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. :return: None """ session.expunge_all() -if enable_pickling is None: -enable_pickling = configuration.conf.getboolean( -'core', 'enable_xcom_pickling' -) - +enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') if enable_pickling: value = pickle.dumps(value) else: @@ -4422,13 +4433,11 @@ class XCom(Base, LoggingMixin): task_id=None, dag_id=None, include_prior_dates=False, -enable_pickling=None, session=None): """ Retrieve an XCom value, optionally meeting certain criteria. -TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0. - -:param enable_pickling: If pickling is not enabled, the XCOM value will be parsed to JSON instead. +TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. :return: XCom value """ filters = [] @@ -4450,11 +4459,7 @@ class XCom(Base, LoggingMixin): result = query.first() if result: -if enable_pickling is None: -enable_pickling = configuration.conf.getboolean( -'core', 'enable_xcom_pickling' -) - +enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') if enable_pickling: return pickle.loads(result.value) else: @@ -4462,7 +4467,7 @@ class XCom(Base, LoggingMixin): return json.loads(result.value.decode('UTF-8')) except ValueError: log = LoggingMixin().log -log.error("Could not serialize the XCOM value into JSON. " +log.error("Could not deserialize the XCOM value from JSON. " "If you are using pickles instead of JSON " "for XCOM, t
[jira] [Commented] (AIRFLOW-1730) The value of XCom that queried from DB is not unpickled.
[ https://issues.apache.org/jira/browse/AIRFLOW-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490436#comment-16490436 ] ASF subversion and git services commented on AIRFLOW-1730: -- Commit c6deeb2ff453ba50ff75c315d34110f6f4a886a5 in incubator-airflow's branch refs/heads/master from [~mrkm4ntr] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=c6deeb2 ] [AIRFLOW-1730] Unpickle value of XCom queried from DB > The value of XCom that queried from DB is not unpickled. > > > Key: AIRFLOW-1730 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1730 > Project: Apache Airflow > Issue Type: Bug > Components: models, xcom >Affects Versions: 1.9.0 >Reporter: Shintaro Murakami >Priority: Major > Fix For: 1.10 > > Attachments: xcoms_by_example_xcom.png > > > If enable_xcom_pickling is True, the value of XCom that queried from DB is > not unpickled. > The list of XComs not rendered correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[1/2] incubator-airflow git commit: [AIRFLOW-1730] Unpickle value of XCom queried from DB
Repository: incubator-airflow Updated Branches: refs/heads/master c97ad4363 -> ba84b6f4a [AIRFLOW-1730] Unpickle value of XCom queried from DB Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c6deeb2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c6deeb2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c6deeb2f Branch: refs/heads/master Commit: c6deeb2ff453ba50ff75c315d34110f6f4a886a5 Parents: e4e7b55 Author: Shintaro Murakami Authored: Tue Oct 17 18:40:19 2017 +0900 Committer: Shintaro Murakami Committed: Fri May 25 15:22:09 2018 +0900 -- airflow/models.py | 61 +++--- tests/models.py | 59 +++- 2 files changed, 62 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6deeb2f/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index da18ec7..2fd05cb 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4397,6 +4397,24 @@ class XCom(Base, LoggingMixin): Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False), ) +""" +TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. +""" +@reconstructor +def init_on_load(self): +enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +self.value = pickle.loads(self.value) +else: +try: +self.value = json.loads(self.value.decode('UTF-8')) +except (UnicodeEncodeError, ValueError): +# For backward-compatibility. +# Preventing errors in webserver +# due to XComs mixed with pickled and unpickled. +self.value = pickle.loads(self.value) + def __repr__(self): return ''.format( key=self.key, @@ -4412,23 +4430,16 @@ class XCom(Base, LoggingMixin): execution_date, task_id, dag_id, -enable_pickling=None, session=None): """ Store an XCom value. -TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be -removed in Airflow 2.0. :param enable_pickling: If pickling is not enabled, the -XCOM value will be parsed as JSON instead. - +TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. :return: None """ session.expunge_all() -if enable_pickling is None: -enable_pickling = configuration.conf.getboolean( -'core', 'enable_xcom_pickling' -) - +enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') if enable_pickling: value = pickle.dumps(value) else: @@ -4469,15 +4480,11 @@ class XCom(Base, LoggingMixin): task_id=None, dag_id=None, include_prior_dates=False, -enable_pickling=None, session=None): """ Retrieve an XCom value, optionally meeting certain criteria. TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0. - -:param enable_pickling: If pickling is not enabled, -the XCOM value will be parsed to JSON instead. :return: XCom value """ filters = [] @@ -4498,11 +4505,7 @@ class XCom(Base, LoggingMixin): result = query.first() if result: -if enable_pickling is None: -enable_pickling = configuration.conf.getboolean( -'core', 'enable_xcom_pickling' -) - +enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') if enable_pickling: return pickle.loads(result.value) else: @@ -4510,7 +4513,7 @@ class XCom(Base, LoggingMixin): return json.loads(result.value.decode('UTF-8')) except ValueError: log = LoggingMixin().log -log.error("Could not serialize the XCOM value into JSON. " +log.error("Could not deserialize the XCOM value from JSON. " "If you are using pickles instead of JSON " "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.
[2/2] incubator-airflow git commit: Merge pull request #2701 from mrkm4ntr/airflow-1730
Merge pull request #2701 from mrkm4ntr/airflow-1730 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ba84b6f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ba84b6f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ba84b6f4 Branch: refs/heads/master Commit: ba84b6f4a984f0a36c6491ddbae14fc491af31c6 Parents: c97ad43 c6deeb2 Author: Ash Berlin-Taylor Authored: Fri May 25 09:53:51 2018 +0100 Committer: Ash Berlin-Taylor Committed: Fri May 25 09:53:51 2018 +0100 -- airflow/models.py | 61 +++--- tests/models.py | 59 +++- 2 files changed, 62 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ba84b6f4/airflow/models.py --
[jira] [Resolved] (AIRFLOW-2515) Add dependency on thrift_sasl so that HiveServer2Hook works
[ https://issues.apache.org/jira/browse/AIRFLOW-2515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2515. --- Resolution: Fixed Fix Version/s: 2.0.0 1.10.0 > Add dependency on thrift_sasl so that HiveServer2Hook works > --- > > Key: AIRFLOW-2515 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2515 > Project: Apache Airflow > Issue Type: Bug > Components: dependencies, hive_hooks, hooks >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 1.10.0, 2.0.0 > > > Installing "hive" extra does not require thrift_sasl module for now: > {code} > $ pip install --upgrade -e ".[hive]" > (snip) > Successfully installed apache-airflow > $ pip show thrift_sasl > $ > {code} > But in fact, HiveServer2Hook (more precisely, impyla on which HiveServer2Hook > depends) requires that module, even if kerberos is disabled. > {code} > $ ipython > Python 3.5.2 (default, Nov 23 2017, 16:37:01) > Type 'copyright', 'credits' or 'license' for more information > IPython 6.3.1 -- An enhanced Interactive Python. Type '?' for help. > In [1]: from airflow.hooks.hive_hooks import HiveServer2Hook > In [2]: h = HiveServer2Hook() > In [3]: conn = h.get_conn() > [2018-05-23 11:42:30,452] {base_hook.py:83} INFO - Using connection to: > localhost > --- > ImportError Traceback (most recent call last) > (snip) > 147 > 148 # Initializes a sasl client > --> 149 from thrift_sasl import TSaslClientTransport > 150 try: > 151 import sasl # pylint: disable=import-error > ImportError: No module named 'thrift_sasl' > {code} > This is also [documented|https://github.com/cloudera/impyla#dependencies] in > impyla's README: > {quote} > For Hive and/or Kerberos support: > ``` > pip install thrift_sasl==0.2.1 > pip install sasl > ``` > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2515) Add dependency on thrift_sasl so that HiveServer2Hook works
[ https://issues.apache.org/jira/browse/AIRFLOW-2515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490429#comment-16490429 ] ASF subversion and git services commented on AIRFLOW-2515: -- Commit c97ad43634e2b24785283c50829ed9236a9bc69d in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=c97ad43 ] [AIRFLOW-2515] Add dependency on thrift_sasl to hive extra This PR adds a dependency on thrift_sasl to hive extra so that HiveServer2Hook.get_conn() works. Closes #3408 from sekikn/AIRFLOW-2515 > Add dependency on thrift_sasl so that HiveServer2Hook works > --- > > Key: AIRFLOW-2515 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2515 > Project: Apache Airflow > Issue Type: Bug > Components: dependencies, hive_hooks, hooks >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > > Installing "hive" extra does not require thrift_sasl module for now: > {code} > $ pip install --upgrade -e ".[hive]" > (snip) > Successfully installed apache-airflow > $ pip show thrift_sasl > $ > {code} > But in fact, HiveServer2Hook (more precisely, impyla on which HiveServer2Hook > depends) requires that module, even if kerberos is disabled. > {code} > $ ipython > Python 3.5.2 (default, Nov 23 2017, 16:37:01) > Type 'copyright', 'credits' or 'license' for more information > IPython 6.3.1 -- An enhanced Interactive Python. Type '?' for help. > In [1]: from airflow.hooks.hive_hooks import HiveServer2Hook > In [2]: h = HiveServer2Hook() > In [3]: conn = h.get_conn() > [2018-05-23 11:42:30,452] {base_hook.py:83} INFO - Using connection to: > localhost > --- > ImportError Traceback (most recent call last) > (snip) > 147 > 148 # Initializes a sasl client > --> 149 from thrift_sasl import TSaslClientTransport > 150 try: > 151 import sasl # pylint: disable=import-error > ImportError: No module named 'thrift_sasl' > {code} > This is also [documented|https://github.com/cloudera/impyla#dependencies] in > impyla's README: > {quote} > For Hive and/or Kerberos support: > ``` > pip install thrift_sasl==0.2.1 > pip install sasl > ``` > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2515) Add dependency on thrift_sasl so that HiveServer2Hook works
[ https://issues.apache.org/jira/browse/AIRFLOW-2515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490430#comment-16490430 ] ASF subversion and git services commented on AIRFLOW-2515: -- Commit c97ad43634e2b24785283c50829ed9236a9bc69d in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=c97ad43 ] [AIRFLOW-2515] Add dependency on thrift_sasl to hive extra This PR adds a dependency on thrift_sasl to hive extra so that HiveServer2Hook.get_conn() works. Closes #3408 from sekikn/AIRFLOW-2515 > Add dependency on thrift_sasl so that HiveServer2Hook works > --- > > Key: AIRFLOW-2515 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2515 > Project: Apache Airflow > Issue Type: Bug > Components: dependencies, hive_hooks, hooks >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > > Installing "hive" extra does not require thrift_sasl module for now: > {code} > $ pip install --upgrade -e ".[hive]" > (snip) > Successfully installed apache-airflow > $ pip show thrift_sasl > $ > {code} > But in fact, HiveServer2Hook (more precisely, impyla on which HiveServer2Hook > depends) requires that module, even if kerberos is disabled. > {code} > $ ipython > Python 3.5.2 (default, Nov 23 2017, 16:37:01) > Type 'copyright', 'credits' or 'license' for more information > IPython 6.3.1 -- An enhanced Interactive Python. Type '?' for help. > In [1]: from airflow.hooks.hive_hooks import HiveServer2Hook > In [2]: h = HiveServer2Hook() > In [3]: conn = h.get_conn() > [2018-05-23 11:42:30,452] {base_hook.py:83} INFO - Using connection to: > localhost > --- > ImportError Traceback (most recent call last) > (snip) > 147 > 148 # Initializes a sasl client > --> 149 from thrift_sasl import TSaslClientTransport > 150 try: > 151 import sasl # pylint: disable=import-error > ImportError: No module named 'thrift_sasl' > {code} > This is also [documented|https://github.com/cloudera/impyla#dependencies] in > impyla's README: > {quote} > For Hive and/or Kerberos support: > ``` > pip install thrift_sasl==0.2.1 > pip install sasl > ``` > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2515] Add dependency on thrift_sasl to hive extra
Repository: incubator-airflow Updated Branches: refs/heads/master 4c0d67f0d -> c97ad4363 [AIRFLOW-2515] Add dependency on thrift_sasl to hive extra This PR adds a dependency on thrift_sasl to hive extra so that HiveServer2Hook.get_conn() works. Closes #3408 from sekikn/AIRFLOW-2515 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c97ad436 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c97ad436 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c97ad436 Branch: refs/heads/master Commit: c97ad43634e2b24785283c50829ed9236a9bc69d Parents: 4c0d67f Author: Kengo Seki Authored: Fri May 25 10:45:23 2018 +0200 Committer: Fokko Driesprong Committed: Fri May 25 10:45:23 2018 +0200 -- setup.py | 1 + tests/hooks/test_hive_hook.py | 8 +++- 2 files changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c97ad436/setup.py -- diff --git a/setup.py b/setup.py index dcd59af..273d1ce 100644 --- a/setup.py +++ b/setup.py @@ -156,6 +156,7 @@ hive = [ 'hmsclient>=0.1.0', 'pyhive>=0.1.3', 'impyla>=0.13.3', +'thrift_sasl==0.2.1', 'unicodecsv>=0.14.1' ] jdbc = ['jaydebeapi>=1.1.1'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c97ad436/tests/hooks/test_hive_hook.py -- diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py index f1a3b51..b0029ea 100644 --- a/tests/hooks/test_hive_hook.py +++ b/tests/hooks/test_hive_hook.py @@ -30,7 +30,7 @@ from collections import OrderedDict from hmsclient import HMSClient from airflow.exceptions import AirflowException -from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook +from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook, HiveServer2Hook from airflow import DAG, configuration from airflow.operators.hive_operator import HiveOperator from airflow.utils import timezone @@ -317,3 +317,9 @@ class TestHiveMetastoreHook(HiveEnvironmentTest): self.assertFalse( self.hook.table_exists(str(random.randint(1, 1))) ) + + +class TestHiveServer2Hook(unittest.TestCase): +def test_get_conn(self): +hook = HiveServer2Hook() +hook.get_conn()
[jira] [Commented] (AIRFLOW-2523) Add how-to guide section for managing GCP connections
[ https://issues.apache.org/jira/browse/AIRFLOW-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490420#comment-16490420 ] ASF subversion and git services commented on AIRFLOW-2523: -- Commit 4c0d67f0d0094a5ef8a5ec2407fe91d16af01129 in incubator-airflow's branch refs/heads/master from [~swast] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4c0d67f ] [AIRFLOW-2523] Add how-to for managing GCP connections I'd like to have how-to guides for all connection types, or at least the different categories of connection types. I found it difficult to figure out how to manage a GCP connection, this commit add a how-to guide for this. Also, since creating and editing connections really aren't all that different, the PR renames the "creating connections" how-to to "managing connections". Closes #3419 from tswast/howto > Add how-to guide section for managing GCP connections > - > > Key: AIRFLOW-2523 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2523 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib, Documentation >Reporter: Tim Swast >Assignee: Tim Swast >Priority: Minor > Fix For: 2.0.0 > > > I'd like to have how-to guides for all connection types, or at least the > different categories of connection types. I found it difficult to figure out > how to manage a GCP connection, so a PR is coming soon which adds a guide for > this. > Also, since creating and editing connections really aren't all that > different, the PR renames the "creating connections" how-to to "managing > connections". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2523] Add how-to for managing GCP connections
Repository: incubator-airflow Updated Branches: refs/heads/master 66f00bbf7 -> 4c0d67f0d [AIRFLOW-2523] Add how-to for managing GCP connections I'd like to have how-to guides for all connection types, or at least the different categories of connection types. I found it difficult to figure out how to manage a GCP connection, this commit add a how-to guide for this. Also, since creating and editing connections really aren't all that different, the PR renames the "creating connections" how-to to "managing connections". Closes #3419 from tswast/howto Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c0d67f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c0d67f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c0d67f0 Branch: refs/heads/master Commit: 4c0d67f0d0094a5ef8a5ec2407fe91d16af01129 Parents: 66f00bb Author: Tim Swast Authored: Fri May 25 09:37:29 2018 +0100 Committer: Kaxil Naik Committed: Fri May 25 09:37:29 2018 +0100 -- docs/concepts.rst | 28 --- docs/howto/create-connection.rst | 8 -- docs/howto/index.rst | 2 +- docs/howto/manage-connections.rst | 135 + docs/howto/secure-connections.rst | 7 -- docs/img/connection_create.png| Bin 0 -> 41547 bytes docs/img/connection_edit.png | Bin 0 -> 53636 bytes docs/img/connections.png | Bin 93057 -> 48442 bytes docs/integration.rst | 3 + 9 files changed, 152 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c0d67f0/docs/concepts.rst -- diff --git a/docs/concepts.rst b/docs/concepts.rst index c28b10f..866f916 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -308,6 +308,8 @@ UI. As slots free up, queued tasks start running based on the Note that by default tasks aren't assigned to any pool and their execution parallelism is only limited to the executor's setting. +.. _concepts-connections: + Connections === @@ -324,16 +326,12 @@ from ``BaseHook``, Airflow will choose one connection randomly, allowing for some basic load balancing and fault tolerance when used in conjunction with retries. -Airflow also has the ability to reference connections via environment -variables from the operating system. The environment variable needs to be -prefixed with ``AIRFLOW_CONN_`` to be considered a connection. When -referencing the connection in the Airflow pipeline, the ``conn_id`` should -be the name of the variable without the prefix. For example, if the ``conn_id`` -is named ``postgres_master`` the environment variable should be named -``AIRFLOW_CONN_POSTGRES_MASTER`` (note that the environment variable must be -all uppercase). Airflow assumes the value returned from the environment -variable to be in a URI format (e.g. -``postgres://user:password@localhost:5432/master`` or ``s3://accesskey:secretkey@S3``). +Many hooks have a default ``conn_id``, where operators using that hook do not +need to supply an explicit connection ID. For example, the default +``conn_id`` for the :class:`~airflow.hooks.postgres_hook.PostgresHook` is +``postgres_default``. + +See :doc:`howto/manage-connections` for how to create and manage connections. Queues == @@ -410,7 +408,7 @@ Variables Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow. Variables can be listed, created, updated and deleted from the UI (``Admin -> Variables``), -code or CLI. In addition, json settings files can be bulk uploaded through +code or CLI. In addition, json settings files can be bulk uploaded through the UI. While your pipeline code definition and most of your constants and variables should be defined in code and stored in source control, it can be useful to have some variables or configuration items @@ -427,18 +425,18 @@ The second call assumes ``json`` content and will be deserialized into ``bar``. Note that ``Variable`` is a sqlalchemy model and can be used as such. -You can use a variable from a jinja template with the syntax : +You can use a variable from a jinja template with the syntax : .. code:: bash echo {{ var.value. }} - -or if you need to deserialize a json object from the variable : + +or if you need to deserialize a json object from the variable : .. code:: bash echo {{ var.json. }} - + Branching = http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c0d67f0/docs/howto/create-connection.rst -- diff --git a/docs/howto/create-connection.rst b/docs/howto/create
[jira] [Resolved] (AIRFLOW-2523) Add how-to guide section for managing GCP connections
[ https://issues.apache.org/jira/browse/AIRFLOW-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik resolved AIRFLOW-2523. - Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3419 [https://github.com/apache/incubator-airflow/pull/3419] > Add how-to guide section for managing GCP connections > - > > Key: AIRFLOW-2523 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2523 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib, Documentation >Reporter: Tim Swast >Assignee: Tim Swast >Priority: Minor > Fix For: 2.0.0 > > > I'd like to have how-to guides for all connection types, or at least the > different categories of connection types. I found it difficult to figure out > how to manage a GCP connection, so a PR is coming soon which adds a guide for > this. > Also, since creating and editing connections really aren't all that > different, the PR renames the "creating connections" how-to to "managing > connections". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2510) Introduce new macros: prev_ds and next_ds
[ https://issues.apache.org/jira/browse/AIRFLOW-2510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2510. --- Resolution: Fixed Fix Version/s: 2.0.0 1.10.0 Issue resolved by pull request #3418 [https://github.com/apache/incubator-airflow/pull/3418] > Introduce new macros: prev_ds and next_ds > - > > Key: AIRFLOW-2510 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2510 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Chao-Han Tsai >Assignee: Chao-Han Tsai >Priority: Major > Fix For: 1.10.0, 2.0.0 > > > Introduce new macros {{ prev_ds }} and {{ next_ds }}. > {{ prev_ds }}: the previous execution date as {{ -MM-DD }} > {{ next_ds }}: the next execution date as {{ -MM-DD }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2510) Introduce new macros: prev_ds and next_ds
[ https://issues.apache.org/jira/browse/AIRFLOW-2510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490394#comment-16490394 ] ASF subversion and git services commented on AIRFLOW-2510: -- Commit 66f00bbf7b296823fbe35c157d839da321aa6162 in incubator-airflow's branch refs/heads/master from [~milton0825] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=66f00bb ] [AIRFLOW-2510] Introduce new macros: prev_ds and next_ds Closes #3418 from milton0825/introduce-next_ds- prev_ds > Introduce new macros: prev_ds and next_ds > - > > Key: AIRFLOW-2510 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2510 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Chao-Han Tsai >Assignee: Chao-Han Tsai >Priority: Major > > Introduce new macros {{ prev_ds }} and {{ next_ds }}. > {{ prev_ds }}: the previous execution date as {{ -MM-DD }} > {{ next_ds }}: the next execution date as {{ -MM-DD }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2510] Introduce new macros: prev_ds and next_ds
Repository: incubator-airflow Updated Branches: refs/heads/master e4e7b55ad -> 66f00bbf7 [AIRFLOW-2510] Introduce new macros: prev_ds and next_ds Closes #3418 from milton0825/introduce-next_ds- prev_ds Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66f00bbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66f00bbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66f00bbf Branch: refs/heads/master Commit: 66f00bbf7b296823fbe35c157d839da321aa6162 Parents: e4e7b55 Author: Chao-Han Tsai Authored: Fri May 25 10:13:49 2018 +0200 Committer: Fokko Driesprong Committed: Fri May 25 10:13:49 2018 +0200 -- airflow/models.py | 16 +--- docs/code.rst | 6 ++ 2 files changed, 19 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66f00bbf/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index da18ec7..afd6e70 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1751,14 +1751,22 @@ class TaskInstance(Base, LoggingMixin): if 'tables' in task.params: tables = task.params['tables'] -ds = self.execution_date.isoformat()[:10] +ds = self.execution_date.strftime('%Y-%m-%d') ts = self.execution_date.isoformat() -yesterday_ds = (self.execution_date - timedelta(1)).isoformat()[:10] -tomorrow_ds = (self.execution_date + timedelta(1)).isoformat()[:10] +yesterday_ds = (self.execution_date - timedelta(1)).strftime('%Y-%m-%d') +tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d') prev_execution_date = task.dag.previous_schedule(self.execution_date) next_execution_date = task.dag.following_schedule(self.execution_date) +next_ds = None +if next_execution_date: +next_ds = next_execution_date.strftime('%Y-%m-%d') + +prev_ds = None +if prev_execution_date: +prev_ds = prev_execution_date.strftime('%Y-%m-%d') + ds_nodash = ds.replace('-', '') ts_nodash = ts.replace('-', '').replace(':', '') yesterday_ds_nodash = yesterday_ds.replace('-', '') @@ -1820,6 +1828,8 @@ class TaskInstance(Base, LoggingMixin): return { 'dag': task.dag, 'ds': ds, +'next_ds': next_ds, +'prev_ds': prev_ds, 'ds_nodash': ds_nodash, 'ts': ts, 'ts_nodash': ts_nodash, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66f00bbf/docs/code.rst -- diff --git a/docs/code.rst b/docs/code.rst index 1737d15..5343509 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -231,6 +231,12 @@ VariableDescription = ``{{ ds }}``the execution date as ``-MM-DD`` ``{{ ds_nodash }}`` the execution date as ``MMDD`` +``{{ prev_ds }}`` the previous execution date as ``-MM-DD``. +if ``{{ ds }}`` is ``2016-01-08`` and ``schedule_interval`` is ``@weekly``, +``{{ prev_ds }}`` will be ``2016-01-01``. +``{{ next_ds }}`` the next execution date as ``-MM-DD``. +if ``{{ ds }}`` is ``2016-01-01`` and ``schedule_interval`` is ``@weekly``, +``{{ prev_ds }}`` will be ``2016-01-08``. ``{{ yesterday_ds }}`` yesterday's date as ``-MM-DD`` ``{{ yesterday_ds_nodash }}`` yesterday's date as ``MMDD`` ``{{ tomorrow_ds }}`` tomorrow's date as ``-MM-DD``
[jira] [Comment Edited] (AIRFLOW-2517) backfill support passing key values through CLI
[ https://issues.apache.org/jira/browse/AIRFLOW-2517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489929#comment-16489929 ] Jacques Gaudin edited comment on AIRFLOW-2517 at 5/25/18 7:39 AM: -- The same could be done for `test`. Please see this SO question: [https://stackoverflow.com/questions/50132215/set-dag-run-conf-parameters-in-call-to-airflow-test/50360218#50360218] was (Author: jacques gaudin): The same could be done for `test`. Please see this SO question: https://issues.apache.org/jira/browse/AIRFLOW-2517 > backfill support passing key values through CLI > --- > > Key: AIRFLOW-2517 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2517 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Chao-Han Tsai >Assignee: Chao-Han Tsai >Priority: Major > > In backfill, we can provide key-value pairs through CLI and those pairs can > be accessed through macros. This is just like the way `trigger_dag -c` works > [1]. > Let's walk through an example. > In the airflow CLI we specify a key-value pair. > {code:java} > airflow backfill hello_world -s 2018-02-01 -e 2018-02-08 -c ' {"text": "some > text"} > {code} > In the DAG file, I have a `BashOperator` that contains a template command and > I want > \{{ dag_run.conf.text }} resolves to the text I passed in CLI. > {code:java} > templated_command = """ > echo "ds = {{ ds }}" > echo "prev_ds = {{ macros.datetime.strftime(prev_execution_date, "%Y-%m-%d") > }}" > echo "next_ds = {{ macros.datetime.strftime(next_execution_date, "%Y-%m-%d") > }}" > echo "text_through_conf = {{ dag_run.conf.text }}" > """ > bash_operator = BashOperator( > task_id='bash_task', > bash_command=templated_command, > dag=dag > ) > {code} > [1] [https://airflow.apache.org/cli.html#trigger_dag] -- This message was sent by Atlassian JIRA (v7.6.3#76005)