[jira] [Created] (AIRFLOW-6528) disable W503 flake8 check (line break before binary operator)
Daniel Standish created AIRFLOW-6528: Summary: disable W503 flake8 check (line break before binary operator) Key: AIRFLOW-6528 URL: https://issues.apache.org/jira/browse/AIRFLOW-6528 Project: Apache Airflow Issue Type: Bug Components: pre-commit Affects Versions: 1.10.7 Reporter: Daniel Standish Flake8's W503 rule says there should be no line break before binary operator. This rule is incompatible with black formatter, and is also in my opinion bad style. Status quo example with W503 check enabled: {code} @property def sqlalchemy_scheme(self): """ Database provided in init if exists; otherwise, ``schema`` from ``Connection`` object. """ return ( self._sqlalchemy_scheme or self.connection_extra_lower.get('sqlalchemy_scheme') or self.DEFAULT_SQLALCHEMY_SCHEME ) {code} as required by black (W503 disabled) {code} @property def sqlalchemy_scheme(self): """ Database provided in init if exists; otherwise, ``schema`` from ``Connection`` object. """ return ( self._sqlalchemy_scheme or self.connection_extra_lower.get('sqlalchemy_scheme') or self.DEFAULT_SQLALCHEMY_SCHEME ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6517) make merge_dicts recursive
Daniel Standish created AIRFLOW-6517: Summary: make merge_dicts recursive Key: AIRFLOW-6517 URL: https://issues.apache.org/jira/browse/AIRFLOW-6517 Project: Apache Airflow Issue Type: Bug Components: utils Affects Versions: 1.10.7 Reporter: Daniel Standish Assignee: Daniel Standish -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6398) improve flakey test test_mark_success_no_kill
Daniel Standish created AIRFLOW-6398: Summary: improve flakey test test_mark_success_no_kill Key: AIRFLOW-6398 URL: https://issues.apache.org/jira/browse/AIRFLOW-6398 Project: Apache Airflow Issue Type: Bug Components: tests Affects Versions: 1.10.7 Reporter: Daniel Standish Assignee: Daniel Standish test test_mark_success_no_kill fails regularly part of the problem is that it depends on timing of a subprocess and database operations we can reduce complexity by using python operator instead of bash operator. currently it uses bash operator to call {{sleep 600}} this introduces an unnecessary layer of multiprocessing. there is also a bug in bash operator which was exacerbating the problem [AIRFLOW-6397] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6397) Check for `sub_process` before trying to get pid in bash operator on kill
Daniel Standish created AIRFLOW-6397: Summary: Check for `sub_process` before trying to get pid in bash operator on kill Key: AIRFLOW-6397 URL: https://issues.apache.org/jira/browse/AIRFLOW-6397 Project: Apache Airflow Issue Type: Bug Components: operators Affects Versions: 1.10.7 Reporter: Daniel Standish Assignee: Daniel Standish test {{test_mark_success_no_kill}} in {{TestLocalTaskJob}} is very flakey i found that one reason is that the test may attempt to kill the task before the subprocess is created and stored as an attribute For example: {code} [2019-12-29 15:03:51,963] {bash_operator.py:116} INFO - Running command: sleep 600 [2019-12-29 15:03:51,967] {helpers.py:315} INFO - Sending Signals.SIGTERM to GPID 57093 [2019-12-29 15:03:51,968] {taskinstance.py:913} ERROR - Received SIGTERM. Terminating subprocesses. [2019-12-29 15:03:51,970] {bash_operator.py:143} INFO - Sending SIGTERM signal to bash process group [2019-12-29 15:03:51,970] {taskinstance.py:913} ERROR - Received SIGTERM. Terminating subprocesses. [2019-12-29 15:03:51,982] {bash_operator.py:143} INFO - Sending SIGTERM signal to bash process group [2019-12-29 15:03:51,982] {taskinstance.py:1078} ERROR - 'BashOperator' object has no attribute 'sub_process' Traceback (most recent call last): File "/Users/dstandish/code/airflow/airflow/models/taskinstance.py", line 945, in _run_raw_task result = task_copy.execute(context=context) File "/Users/dstandish/code/airflow/airflow/operators/bash_operator.py", line 123, in execute preexec_fn=pre_exec) File "/Users/dstandish/.pyenv/versions/3.7.5/lib/python3.7/subprocess.py", line 800, in __init__ restore_signals, start_new_session) File "/Users/dstandish/.pyenv/versions/3.7.5/lib/python3.7/subprocess.py", line 1505, in _execute_child part = os.read(errpipe_read, 5) File "/Users/dstandish/code/airflow/airflow/models/taskinstance.py", line 914, in signal_handler task_copy.on_kill() File "/Users/dstandish/code/airflow/airflow/operators/bash_operator.py", line 144, in on_kill os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM) AttributeError: 'BashOperator' object has no attribute 'sub_process' [2019-12-29 15:03:51,988] {taskinstance.py:1123} INFO - Marking task as FAILED.dag_id=test_mark_success, task_id=task1, execution_date=20160101T00, start_date=20191229T230351, end_date=20191229T230351 [2019-12-29 15:03:52,022] {helpers.py:281} INFO - Process psutil.Process(pid=57093, status='terminated') (57093) terminated with exit code 1 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6296) add mssql odbc hook
Daniel Standish created AIRFLOW-6296: Summary: add mssql odbc hook Key: AIRFLOW-6296 URL: https://issues.apache.org/jira/browse/AIRFLOW-6296 Project: Apache Airflow Issue Type: Bug Components: hooks Affects Versions: 1.10.7 Reporter: Daniel Standish Assignee: Daniel Standish -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6254) obscure conn extra in logs
Daniel Standish created AIRFLOW-6254: Summary: obscure conn extra in logs Key: AIRFLOW-6254 URL: https://issues.apache.org/jira/browse/AIRFLOW-6254 Project: Apache Airflow Issue Type: Bug Components: core Affects Versions: 1.10.6 Reporter: Daniel Standish Assignee: Daniel Standish Fix For: 1.10.7 When {{BaseHook.get_connection}} is called, it calls {{conn.log_info()}} on the returned {{conn}} object. This is prints to log the full contents of {{conn.extra}}. This is problematic because there can be sensitive information in {{conn.extra}}. The present change resolves this by adding method {{conn.log_info}} which obscures {{extra}}, and calling that in {{get_connection}} instead of {{debug_info}}. The {{debug_info}} method itself is left unchanged. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5793) add test for multiple alembic revision heads
Daniel Standish created AIRFLOW-5793: Summary: add test for multiple alembic revision heads Key: AIRFLOW-5793 URL: https://issues.apache.org/jira/browse/AIRFLOW-5793 Project: Apache Airflow Issue Type: Improvement Components: tests Affects Versions: 1.10.5 Reporter: Daniel Standish Assignee: Daniel Standish Depending on the timing of merges with migrations, we can end up with two revision heads that need to be merged. This adds a test to detect when multiple heads are present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5768) Google Cloud SQL - Don't store ephemeral connection object to database
Daniel Standish created AIRFLOW-5768: Summary: Google Cloud SQL - Don't store ephemeral connection object to database Key: AIRFLOW-5768 URL: https://issues.apache.org/jira/browse/AIRFLOW-5768 Project: Apache Airflow Issue Type: Improvement Components: gcp Affects Versions: 1.10.5 Reporter: Daniel Standish Assignee: Daniel Standish GCP cloud sql operator creates dynamically an ephemeral Connection object. It persists to metastore during execution and deletes afterward. This behavior has negative impact on our ability to refactor creds management. By not persisting to database, we can also remove some complexity re ensuring connection is deleted in event of failure, and the tests that go along with that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5768) Google Cloud SQL - Don't store ephemeral connection object to database
[ https://issues.apache.org/jira/browse/AIRFLOW-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5768: - Description: GCP cloud sql operator creates dynamically an ephemeral Connection object. It persists to metastore during execution and deletes afterward. This behavior has negative impact on our ability to refactor creds management. By not persisting to database, we can also remove some complexity re ensuring connection is deleted in event of failure, and the tests that go along with that. It does require that we add optional param `connection` to both MySqlHook and PostgresHook. was: GCP cloud sql operator creates dynamically an ephemeral Connection object. It persists to metastore during execution and deletes afterward. This behavior has negative impact on our ability to refactor creds management. By not persisting to database, we can also remove some complexity re ensuring connection is deleted in event of failure, and the tests that go along with that. > Google Cloud SQL - Don't store ephemeral connection object to database > -- > > Key: AIRFLOW-5768 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5768 > Project: Apache Airflow > Issue Type: Improvement > Components: gcp >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > GCP cloud sql operator creates dynamically an ephemeral Connection object. > It persists to metastore during execution and deletes afterward. > This behavior has negative impact on our ability to refactor creds > management. > By not persisting to database, we can also remove some complexity re ensuring > connection is deleted in event of failure, and the tests that go along with > that. > It does require that we add optional param `connection` to both MySqlHook and > PostgresHook. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5768) Google Cloud SQL - Don't store ephemeral connection object to database
[ https://issues.apache.org/jira/browse/AIRFLOW-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5768: - Description: GCP cloud sql operator creates dynamically an ephemeral Connection object. It persists to metastore during execution and deletes afterward. This behavior has negative impact on our ability to refactor creds management. By not persisting to database, we can also remove some complexity re ensuring connection is deleted in event of failure, and the tests that go along with that. This change requires that we add optional param `connection` to both MySqlHook and PostgresHook. was: GCP cloud sql operator creates dynamically an ephemeral Connection object. It persists to metastore during execution and deletes afterward. This behavior has negative impact on our ability to refactor creds management. By not persisting to database, we can also remove some complexity re ensuring connection is deleted in event of failure, and the tests that go along with that. It does require that we add optional param `connection` to both MySqlHook and PostgresHook. > Google Cloud SQL - Don't store ephemeral connection object to database > -- > > Key: AIRFLOW-5768 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5768 > Project: Apache Airflow > Issue Type: Improvement > Components: gcp >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > GCP cloud sql operator creates dynamically an ephemeral Connection object. > It persists to metastore during execution and deletes afterward. > This behavior has negative impact on our ability to refactor creds > management. > By not persisting to database, we can also remove some complexity re ensuring > connection is deleted in event of failure, and the tests that go along with > that. > This change requires that we add optional param `connection` to both > MySqlHook and PostgresHook. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5751) add get_uri method to Connection object
[ https://issues.apache.org/jira/browse/AIRFLOW-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5751: - Description: Airflow can either use connections stored in database stored in URI form in environment variables. We can add a convenience method `get_uri` on `Connection` object to generate the URI for a connection. This can help users because sometimes it is a little tricky / not obvious how to generate the URI format. I think it could also be nice if each hook had a `get_uri` method that would take all relevant params and produce a correctly encoded URI. If that were implemented, it could use this function for that purpose. was: Airflow can either use connections stored in database stored in URI form in environment variables. We can add a convenience method `get_uri` on `Connection` object to generate the URI for a connection. This can help users because sometimes it is a little tricky / not obvious how to generate the URI format. > add get_uri method to Connection object > --- > > Key: AIRFLOW-5751 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5751 > Project: Apache Airflow > Issue Type: New Feature > Components: core >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > Airflow can either use connections stored in database stored in URI form in > environment variables. > We can add a convenience method `get_uri` on `Connection` object to generate > the URI for a connection. > This can help users because sometimes it is a little tricky / not obvious how > to generate the URI format. > I think it could also be nice if each hook had a `get_uri` method that would > take all relevant params and produce a correctly encoded URI. If that were > implemented, it could use this function for that purpose. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5752) flakey slack build message test
Daniel Standish created AIRFLOW-5752: Summary: flakey slack build message test Key: AIRFLOW-5752 URL: https://issues.apache.org/jira/browse/AIRFLOW-5752 Project: Apache Airflow Issue Type: Test Components: tests Affects Versions: 1.10.5 Reporter: Daniel Standish on python 3.5 test test_build_slack_message fails sometimes because of indeterminate ordering of dict we can resolve this by loading json message to dict and using `assertDictEqual` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5751) add get_uri method to Connection object
[ https://issues.apache.org/jira/browse/AIRFLOW-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5751: - Summary: add get_uri method to Connection object (was: add get_uri method to connections object) > add get_uri method to Connection object > --- > > Key: AIRFLOW-5751 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5751 > Project: Apache Airflow > Issue Type: New Feature > Components: core >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > Airflow can either use connections stored in database stored in URI form in > environment variables. > We can add a convenience method `get_uri` on `Connection` object to generate > the URI for a connection. > This can help users because sometimes it is a little tricky / not obvious how > to generate the URI format. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5751) add get_uri method to connections object
Daniel Standish created AIRFLOW-5751: Summary: add get_uri method to connections object Key: AIRFLOW-5751 URL: https://issues.apache.org/jira/browse/AIRFLOW-5751 Project: Apache Airflow Issue Type: New Feature Components: core Affects Versions: 1.10.5 Reporter: Daniel Standish Assignee: Daniel Standish Airflow can either use connections stored in database stored in URI form in environment variables. We can add a convenience method `get_uri` on `Connection` object to generate the URI for a connection. This can help users because sometimes it is a little tricky / not obvious how to generate the URI format. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5720: - Description: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. the first side effect does not help us; {{default_gcp_project_id}} is irrelevant Only one of the three connections in {{_setup_connections}} has any impact on the test. The call of {{BaseHook.get_connection}} only serves to discard the first connection in mock side effect list, {{gcp_connection}}. The second connection is the one that matters, and it is returned when {{CloudSqlDatabaseHook}} calls `self.get_connection` during init. Since it is a mock side effect, it doesn't matter what value is given for conn_id. So the {{CloudSqlDatabaseHook}} init param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. We should not have extra code that does not serve a purpose because it makes it harder to understand what's actually happening. was: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. the first side effect does not help us; {{default_gcp_project_id}} is irrelevant Only one of the three connections in {{_setup_connections}} has any impact on the test. The call of {{BaseHook.get_connection}} only serves to discard the first connection in mock side effect list, {{gcp_connection}}. The second connection is the one that matters, and it is returned when {{CloudSqlDatabaseHook}} calls `self.get_connection` during init. Since it is a mock side effect, it doesn't matter what value is given for conn_id. So the {{CloudSqlDatabaseHook}} init param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. > don't call _get_connections_from_db in TestCloudSqlDatabaseHook > --- > > Key: AIRFLOW-5720 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5720 > Project: Apache Airflow >
[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5720: - Description: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. the first side effect does not help us; {{default_gcp_project_id}} is irrelevant Only one of the three connections in {{_setup_connections}} has any impact on the test. The call of {{BaseHook.get_connection}} only serves to discard the first connection in mock side effect list, {{gcp_connection}}. The second connection is the one that matters, and it is returned when {{CloudSqlDatabaseHook}} calls `self.get_connection` during init. Since it is a mock side effect, it doesn't matter what value is given for conn_id. So the {{CloudSqlDatabaseHook}} init param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. was: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. the first side effect does not help us; {{default_gcp_project_id}} is irrelevant. All this line serves to accomplish is to discard the first connection, {{gcp_connection}}. This is the first invocation of {{BaseHook.get_connection}}. The second invocation is the one that matters, namely when {{CloudSqlDatabaseHook}} calls `self.get_connection`. This is when {{cloudsql_connection}} is returned. But since it is a mock side effect, it doesn't matter what value you give for conn_id. So the param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. > don't call _get_connections_from_db in TestCloudSqlDatabaseHook > --- > > Key: AIRFLOW-5720 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5720 > Project: Apache Airflow > Issue Type: New Feature > Components: gcp >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority:
[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5720: - Description: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. the first side effect does not help us; {{default_gcp_project_id}} is irrelevant. All this line serves to accomplish is to discard the first connection, {{gcp_connection}}. This is the first invocation of {{BaseHook.get_connection}}. The second invocation is the one that matters, namely when {{CloudSqlDatabaseHook}} calls `self.get_connection`. This is when {{cloudsql_connection}} is returned. But since it is a mock side effect, it doesn't matter what value you give for conn_id. So the param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. was: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. this line: {{default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(}} All this line serves to accomplish is to discard the first connection, {{gcp_connection}}. This is the first invocation of {{BaseHook.get_connection}}. The second invocation is the one that matters, namely when {{CloudSqlDatabaseHook}} calls `self.get_connection`. This is when {{cloudsql_connection}} is returned. But since it is a mock side effect, it doesn't matter what value you give for conn_id. So the param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. > don't call _get_connections_from_db in TestCloudSqlDatabaseHook > --- > > Key: AIRFLOW-5720 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5720 > Project: Apache Airflow > Issue Type: New Feature > Components: gcp >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > Issues with this test class: > *tests are
[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5720: - Description: Issues with this test class: *tests are mocking lower-level than they need to* Tests were mocking {{airflow.hook.BaseHook.get_connections}}. Instead they can mock {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is more direct. *should not reference private method* This is an impediment to refactoring of connections / creds. *Tests had complexity that did not add a benefit* They all had this bit: {code:python} self._setup_connections(get_connections, uri) gcp_conn_id = 'google_cloud_default' hook = CloudSqlDatabaseHook( default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( 'extra__google_cloud_platform__project') ) {code} {{_setup_connections}} was like this: {code:python} @staticmethod def _setup_connections(get_connections, uri): gcp_connection = mock.MagicMock() gcp_connection.extra_dejson = mock.MagicMock() gcp_connection.extra_dejson.get.return_value = 'empty_project' cloudsql_connection = Connection() cloudsql_connection.parse_from_uri(uri) cloudsql_connection2 = Connection() cloudsql_connection2.parse_from_uri(uri) get_connections.side_effect = [[gcp_connection], [cloudsql_connection], [cloudsql_connection2]] {code} Issues here are as follows. 1. no test ever used the third side effect 2. this line: {{default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(}} All this line serves to accomplish is to discard the first connection, {{gcp_connection}}. This is the first invocation of {{BaseHook.get_connection}}. The second invocation is the one that matters, namely when {{CloudSqlDatabaseHook}} calls `self.get_connection`. This is when {{cloudsql_connection}} is returned. But since it is a mock side effect, it doesn't matter what value you give for conn_id. So the param {{default_gcp_project_id}} has no consequence. And because it has no consequence, we should not supply a value for it because this is misleading. was: Test should not reference "private" methods. This impedes refactoring. Also some other issues with these tests. More detail TBD > don't call _get_connections_from_db in TestCloudSqlDatabaseHook > --- > > Key: AIRFLOW-5720 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5720 > Project: Apache Airflow > Issue Type: New Feature > Components: gcp >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > Issues with this test class: > *tests are mocking lower-level than they need to* > Tests were mocking {{airflow.hook.BaseHook.get_connections}}. > Instead they can mock > {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is > more direct. > *should not reference private method* > This is an impediment to refactoring of connections / creds. > *Tests had complexity that did not add a benefit* > They all had this bit: > {code:python} > self._setup_connections(get_connections, uri) > gcp_conn_id = 'google_cloud_default' > hook = CloudSqlDatabaseHook( > > default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get( > 'extra__google_cloud_platform__project') > ) > {code} > {{_setup_connections}} was like this: > {code:python} > @staticmethod > def _setup_connections(get_connections, uri): > gcp_connection = mock.MagicMock() > gcp_connection.extra_dejson = mock.MagicMock() > gcp_connection.extra_dejson.get.return_value = 'empty_project' > cloudsql_connection = Connection() > cloudsql_connection.parse_from_uri(uri) > cloudsql_connection2 = Connection() > cloudsql_connection2.parse_from_uri(uri) > get_connections.side_effect = [[gcp_connection], > [cloudsql_connection], >[cloudsql_connection2]] > {code} > Issues here are as follows. > 1. no test ever used the third side effect > 2. this line: > {{default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(}} > > All this line serves to accomplish is to discard the first connection, > {{gcp_connection}}. This is the first invocation of > {{BaseHook.get_connection}}. > The second invocation is the one that matters, namely when > {{CloudSqlDatabaseHook}} calls `self.get_connection`. > This is when {{cloudsql_connection}} is returned. But since it is a mock > side effect, it doesn't matter what value you
[jira] [Created] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook
Daniel Standish created AIRFLOW-5720: Summary: don't call _get_connections_from_db in TestCloudSqlDatabaseHook Key: AIRFLOW-5720 URL: https://issues.apache.org/jira/browse/AIRFLOW-5720 Project: Apache Airflow Issue Type: New Feature Components: gcp Affects Versions: 1.10.5 Reporter: Daniel Standish Assignee: Daniel Standish Test should not reference "private" methods. This impedes refactoring. Also some other issues with these tests. More detail TBD -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5705) add option for alternative creds backend
[ https://issues.apache.org/jira/browse/AIRFLOW-5705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-5705: - Description: Idea here is to create some kind of generic creds backend that could support using other creds stores such as AWS SSM parameter store. was: Idea hear is to create some kind of generic creds backend that could support using other creds stores such as AWS SSM parameter store. > add option for alternative creds backend > > > Key: AIRFLOW-5705 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5705 > Project: Apache Airflow > Issue Type: New Feature > Components: core >Affects Versions: 1.10.5 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Major > > Idea here is to create some kind of generic creds backend that could support > using other creds stores such as AWS SSM parameter store. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5705) add option for alternative creds backend
Daniel Standish created AIRFLOW-5705: Summary: add option for alternative creds backend Key: AIRFLOW-5705 URL: https://issues.apache.org/jira/browse/AIRFLOW-5705 Project: Apache Airflow Issue Type: New Feature Components: core Affects Versions: 1.10.5 Reporter: Daniel Standish Assignee: Daniel Standish Idea hear is to create some kind of generic creds backend that could support using other creds stores such as AWS SSM parameter store. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-4799) tests using bash operator fail flakily because jinja2 rendering of environment variables
Daniel Standish created AIRFLOW-4799: Summary: tests using bash operator fail flakily because jinja2 rendering of environment variables Key: AIRFLOW-4799 URL: https://issues.apache.org/jira/browse/AIRFLOW-4799 Project: Apache Airflow Issue Type: Bug Components: core Affects Versions: 2.0.0 Reporter: Daniel Standish Assignee: Daniel Standish In test_retry_delay in task instance tests we see this: {code:python} ti = TI( task=task, execution_date=timezone.utcnow()) self.assertEqual(ti.try_number, 1) # first run -- up for retry run_with_error(ti) self.assertEqual(ti.state, State.UP_FOR_RETRY) self.assertEqual(ti.try_number, 2) # second run -- still up for retry because retry_delay hasn't expired run_with_error(ti) self.assertEqual(ti.state, State.UP_FOR_RETRY) # third run -- failed time.sleep(3) run_with_error(ti) self.assertEqual(ti.state, State.FAILED) {code} The same TI is re-run multiple times. The problem is that in the execute method of BashOperator, the {{env}} attribute is modified, and updated to include a copy of the parent bash env. Then when this TI is executed another time, it again tries to render {{env}}, because it is a templated parameter. And it will attempt to load every {{.sh}} or {{.bash}} file found in `env` as a template. If one file does not exist, test will fail with template not found error. The modifications of env in execute method should be local to execute method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-4798) tests may reference dagbag objects that do not yet exist
Daniel Standish created AIRFLOW-4798: Summary: tests may reference dagbag objects that do not yet exist Key: AIRFLOW-4798 URL: https://issues.apache.org/jira/browse/AIRFLOW-4798 Project: Apache Airflow Issue Type: Bug Components: core Affects Versions: 2.0.0 Reporter: Daniel Standish Assignee: Daniel Standish For example TaskInstance test `test_depends_on_past` tries to grab a dag 'test_depends_on_past' but it is not there. It is created in `test_scheduler_job.py` so if you don't run that before task instance tests you will get failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-4788) prev_execution_date is not always pendulum.datetime class
[ https://issues.apache.org/jira/browse/AIRFLOW-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4788: - Description: Despite documentation on macros page, previous execution dates are in general not pendulum type. For one, when reading from database, UtcDateTime returns native datetime type. Also dag.previous_schedule returns datetime type. So, in general, `prev_execution_date` and `ti.previous_ti.execution_date` may be non-pendulum. (there are edge cases when the context var prev_* is pendulum e.g. when there is no DR or no schedule interval or manually triggered, but in general, no.) The problem is, this leads to errors and confusion when using these fields in templating, when you expect it to be pendulum but it isn't. There are a few things to consider: # make UtcDateTime sqlalchemy type return pendulum # make execution date a property of TaskInstance with appropriate getter returning pendulum. # Change dag.previous_schedule to return pendulum was: In certain circumstances previous execution dates may not be pendulum type. For one, when reading from database, UtcDateTime returns native datetime type. Also dag.previous_schedule returns datetime type. So, depending on circumstances, `prev_execution_date` and `ti.previous_ti.execution_date` may be non-pendulum. The problem is, this leads to errors and confusion when using these fields in templating, when you expect it to be pendulum but it isn't. There are a few things to consider: # make UtcDateTime sqlalchemy type return pendulum # make execution date a property of TaskInstance with appropriate getter returning pendulum. # Change dag.previous_schedule to return pendulum > prev_execution_date is not always pendulum.datetime class > - > > Key: AIRFLOW-4788 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4788 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.3 >Reporter: Daniel Standish >Priority: Major > > Despite documentation on macros page, previous execution dates are in general > not pendulum type. > For one, when reading from database, UtcDateTime returns native datetime type. > Also dag.previous_schedule returns datetime type. > So, in general, `prev_execution_date` and `ti.previous_ti.execution_date` may > be non-pendulum. > (there are edge cases when the context var prev_* is pendulum e.g. when there > is no DR or no schedule interval or manually triggered, but in general, no.) > The problem is, this leads to errors and confusion when using these fields in > templating, when you expect it to be pendulum but it isn't. > There are a few things to consider: > # make UtcDateTime sqlalchemy type return pendulum > # make execution date a property of TaskInstance with appropriate getter > returning pendulum. > # Change dag.previous_schedule to return pendulum > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-4788) prev_execution_date is not always pendulum.datetime class
Daniel Standish created AIRFLOW-4788: Summary: prev_execution_date is not always pendulum.datetime class Key: AIRFLOW-4788 URL: https://issues.apache.org/jira/browse/AIRFLOW-4788 Project: Apache Airflow Issue Type: Bug Components: core Affects Versions: 1.10.3 Reporter: Daniel Standish In certain circumstances previous execution dates may not be pendulum type. For one, when reading from database, UtcDateTime returns native datetime type. Also dag.previous_schedule returns datetime type. So, depending on circumstances, `prev_execution_date` and `ti.previous_ti.execution_date` may be non-pendulum. The problem is, this leads to errors and confusion when using these fields in templating, when you expect it to be pendulum but it isn't. There are a few things to consider: # make UtcDateTime sqlalchemy type return pendulum # make execution date a property of TaskInstance with appropriate getter returning pendulum. # Change dag.previous_schedule to return pendulum -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-4787) clicking task status in dag view should take you to TIs of last run only
Daniel Standish created AIRFLOW-4787: Summary: clicking task status in dag view should take you to TIs of last run only Key: AIRFLOW-4787 URL: https://issues.apache.org/jira/browse/AIRFLOW-4787 Project: Apache Airflow Issue Type: Improvement Components: ui Affects Versions: 1.10.3 Reporter: Daniel Standish In dags view, hen you click on the tasks from last run (e.g. the red circle indicating failed tasks), it currently takes you to TIs but includes _all_ failed tasks for that dag. If your dag has a lot of tasks this makes it harder to clear them. You have to select them all individually and be careful not to select tasks in prior run. Following this change, you would initially only see the tasks from last run (consistent with the number represented in dag view that you clicked on). And if you want to see all tasks you can just drop the execution date filter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-4756) gantt chart view fails after clearing failed task in current run
[ https://issues.apache.org/jira/browse/AIRFLOW-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish resolved AIRFLOW-4756. -- Resolution: Fixed Fix Version/s: 1.10.4 merged into master [https://github.com/apache/airflow/pull/5399] > gantt chart view fails after clearing failed task in current run > > > Key: AIRFLOW-4756 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4756 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.10.3 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Minor > Fix For: 1.10.4 > > > To repro: > Make some dag with a number of decently long-running tasks (so you have time > to do this). > Get the dag running > Make sure one of the tasks fails. > Before the others complete, clear the failing task. > View gantt chart. > Observer error like so: > {code} > Traceback (most recent call last): > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 2292, in wsgi_app > response = self.full_dispatch_request() > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1815, in full_dispatch_request > rv = self.handle_user_exception(e) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1718, in handle_user_exception > reraise(exc_type, exc_value, tb) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/_compat.py", > line 35, in reraise > raise value > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1813, in full_dispatch_request > rv = self.dispatch_request() > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1799, in dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", > line 69, in inner > return self._run_view(f, *args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", > line 368, in _run_view > return fn(self, *args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_login/utils.py", > line 258, in decorated_view > return func(*args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/utils.py", > line 275, in wrapper > return f(*args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/utils/db.py", > line 73, in wrapper > return func(*args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/views.py", > line 2015, in gantt > root=root, > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", > line 308, in render > return render_template(template, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py", > line 135, in render_template > context, ctx.app) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py", > line 117, in _render > rv = template.render(context) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/asyncsupport.py", > line 76, in render > return original_render(self, *args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py", > line 1008, in render > return self.environment.handle_exception(exc_info, True) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py", > line 780, in handle_exception > reraise(exc_type, exc_value, tb) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/_compat.py", > line 37, in reraise > raise value.with_traceback(tb) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html", > line 18, in top-level template code > {% extends "airflow/dag.html" %} > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html", > line 19, in top-level template code > {% import 'admin/lib.html' as lib with context %} > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html", > line 18, in top-level template code > {% extends "admin/master.html" %} > File >
[jira] [Assigned] (AIRFLOW-4756) gantt chart view fails after clearing failed task in current run
[ https://issues.apache.org/jira/browse/AIRFLOW-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish reassigned AIRFLOW-4756: Assignee: Daniel Standish > gantt chart view fails after clearing failed task in current run > > > Key: AIRFLOW-4756 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4756 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.10.3 >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Minor > > To repro: > Make some dag with a number of decently long-running tasks (so you have time > to do this). > Get the dag running > Make sure one of the tasks fails. > Before the others complete, clear the failing task. > View gantt chart. > Observer error like so: > {code} > Traceback (most recent call last): > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 2292, in wsgi_app > response = self.full_dispatch_request() > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1815, in full_dispatch_request > rv = self.handle_user_exception(e) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1718, in handle_user_exception > reraise(exc_type, exc_value, tb) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/_compat.py", > line 35, in reraise > raise value > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1813, in full_dispatch_request > rv = self.dispatch_request() > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", > line 1799, in dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", > line 69, in inner > return self._run_view(f, *args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", > line 368, in _run_view > return fn(self, *args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_login/utils.py", > line 258, in decorated_view > return func(*args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/utils.py", > line 275, in wrapper > return f(*args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/utils/db.py", > line 73, in wrapper > return func(*args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/views.py", > line 2015, in gantt > root=root, > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", > line 308, in render > return render_template(template, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py", > line 135, in render_template > context, ctx.app) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py", > line 117, in _render > rv = template.render(context) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/asyncsupport.py", > line 76, in render > return original_render(self, *args, **kwargs) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py", > line 1008, in render > return self.environment.handle_exception(exc_info, True) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py", > line 780, in handle_exception > reraise(exc_type, exc_value, tb) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/_compat.py", > line 37, in reraise > raise value.with_traceback(tb) > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html", > line 18, in top-level template code > {% extends "airflow/dag.html" %} > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html", > line 19, in top-level template code > {% import 'admin/lib.html' as lib with context %} > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html", > line 18, in top-level template code > {% extends "admin/master.html" %} > File > "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", > line 18, in top-level template
[jira] [Created] (AIRFLOW-4756) gantt chart view fails after clearing failed task in current run
Daniel Standish created AIRFLOW-4756: Summary: gantt chart view fails after clearing failed task in current run Key: AIRFLOW-4756 URL: https://issues.apache.org/jira/browse/AIRFLOW-4756 Project: Apache Airflow Issue Type: Bug Components: ui Affects Versions: 1.10.3 Reporter: Daniel Standish To repro: Make some dag with a number of decently long-running tasks (so you have time to do this). Get the dag running Make sure one of the tasks fails. Before the others complete, clear the failing task. View gantt chart. Observer error like so: {code} Traceback (most recent call last): File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", line 2292, in wsgi_app response = self.full_dispatch_request() File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", line 1815, in full_dispatch_request rv = self.handle_user_exception(e) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", line 1718, in handle_user_exception reraise(exc_type, exc_value, tb) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/_compat.py", line 35, in reraise raise value File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", line 1813, in full_dispatch_request rv = self.dispatch_request() File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py", line 1799, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner return self._run_view(f, *args, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view return fn(self, *args, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_login/utils.py", line 258, in decorated_view return func(*args, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/utils.py", line 275, in wrapper return f(*args, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/views.py", line 2015, in gantt root=root, File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py", line 308, in render return render_template(template, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py", line 135, in render_template context, ctx.app) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py", line 117, in _render rv = template.render(context) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/asyncsupport.py", line 76, in render return original_render(self, *args, **kwargs) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py", line 1008, in render return self.environment.handle_exception(exc_info, True) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py", line 780, in handle_exception reraise(exc_type, exc_value, tb) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/_compat.py", line 37, in reraise raise value.with_traceback(tb) File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html", line 18, in top-level template code {% extends "airflow/dag.html" %} File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html", line 19, in top-level template code {% import 'admin/lib.html' as lib with context %} File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html", line 18, in top-level template code {% extends "admin/master.html" %} File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 18, in top-level template code {% extends 'admin/base.html' %} File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html", line 94, in top-level template code {% block tail %} File "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html", line 58, in block "tail" data = {{ data |tojson|safe }}; File
[jira] [Commented] (AIRFLOW-4298) Stop Scheduler warning repeatedly about "connection invalidated"
[ https://issues.apache.org/jira/browse/AIRFLOW-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851561#comment-16851561 ] Daniel Standish commented on AIRFLOW-4298: -- [~ash] [~gcuriel] yes this is from airflow not sqlalchemy. one solution could be to use {{log.debug}} on first reconnect, and only use {{log.warning}} if {{backoff > initial_backoff_seconds}} -- i.e. if it has to retry. it seems like it always reconnects on first try. of course it would be nice to know what is causing this and resolve that instead but i am not sure how to figure that out. and it does not seem to present a material problem. > Stop Scheduler warning repeatedly about "connection invalidated" > > > Key: AIRFLOW-4298 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4298 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler >Affects Versions: 1.10.3 > Environment: Main host with airflow services: Cent-OS 7, Python 3.6.1. > DB - official Docker image with Postgresql 11.2-alpine >Reporter: Anton Cherkasov >Priority: Minor > > I have some strange issue with scheduler after upgrade to 1.10.3 from 1.10.2. > DAG tasks runs only once. After that scheduler logs looks like this: > {noformat} > Apr 11 09:21:33 airflow.infra airflow[32739]: [2019-04-11 09:21:33,094] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > Apr 11 09:21:44 airflow.infra airflow[32739]: [2019-04-11 09:21:44,105] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > Apr 11 09:21:55 airflow.infra airflow[32739]: [2019-04-11 09:21:55,114] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > Apr 11 09:22:06 airflow.infra airflow[32739]: [2019-04-11 09:22:06,123] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > Apr 11 09:22:17 airflow.infra airflow[32739]: [2019-04-11 09:22:17,131] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > Apr 11 09:22:28 airflow.infra airflow[32739]: [2019-04-11 09:22:28,143] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > {noformat} > Logs from Scheduler with *DEBUG* level: > {noformat} > Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] > {{settings.py:154}} DEBUG - Setting up DB connection pool (PID 17447) > Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] > {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. > pool_size=100, pool_recycle=3600, pid=17447 > Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,450] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17449) > Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,535] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17448) > Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,706] > {{jobs.py:1663}} DEBUG - Sleeping for 1.00 seconds to prevent excessive > logging > . . . > Apr 11 09:01:29 airflow.infra airflow[17403]: [2019-04-11 09:01:29,884] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21866) > Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,492] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21865) > Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,785] > {{jobs.py:496}} DEBUG - Waiting for stopped)> > Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,786] > {{jobs.py:496}} DEBUG - Waiting for stopped)> > Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,790] > {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... > Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,765] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21910) > Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,786] > {{jobs.py:496}} DEBUG - Waiting for stopped)> > Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,866] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21909) > Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,468] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21921) > Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] > {{jobs.py:496}} DEBUG - Waiting for stopped)> > Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] > {{jobs.py:496}} DEBUG - Waiting for stopped)> > Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,358] > {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21933) > Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,521] > {{settings.py:206}} DEBUG - Disposing
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Summary: "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC (was: "DB connection invalidated" every JOB_HEARTBEAT_SEC) > "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC > -- > > Key: AIRFLOW-4134 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4134 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10.2 >Reporter: Daniel Standish >Priority: Major > > I am finding with 1.10.2 that I seem to get a warning {{DB connection > invalidated. Reconnecting...}} very frequently. It seems to coincide closely > with my job_heartbeat_sec parameter (5 seconds). > I have tried to diagnose I added logging of the triggering error on line 79 > in airflow/utils/sqlalchemy.py, from which this warning is generated. > Looks like it is related to zombie check. > Call stack: > {code} > Call stack: > File "/usr/local/bin/airflow", line 32, in > args.func(args) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line > 74, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, > in scheduler > job.run() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in > run > self._execute() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, > in _execute > self._execute_helper() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, > in _execute_helper > self.processor_agent.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 511, in start > self._async_mode) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 565, in _launch_process > p.start() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in > start > self._popen = self._Popen(self) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in > _Popen > return _default_context.get_context().Process._Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in > _Popen > return Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in > __init__ > self._launch(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in > _launch > code = process_obj._bootstrap() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in > _bootstrap > self.run() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run > self._target(*self._args, **self._kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 560, in helper > processor_manager.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 797, in start > self.start_in_async() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 820, in start_in_async > simple_dags = self.heartbeat() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1190, in heartbeat > zombies = self._find_zombies() > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, > in wrapper > return func(*args, **kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1236, in _find_zombies > LJ.latest_heartbeat < limit_dttm, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 2925, in all > return list(self) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3081, in __iter__ > return self._execute_and_instances(context) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3103, in _execute_and_instances > querycontext, self._connection_from_session, close_with_result=True > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3111, in _get_bind_args > mapper=self._bind_mapper(), clause=querycontext.statement, **kw > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3096, in _connection_from_session > conn = self.session.connection(**kw) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1120, in connection > execution_options=execution_options, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1126, in _connection_for_bind > engine, execution_options > File
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" polluting scheduler log
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. It seems to coincide closely with my job_heartbeat_sec parameter (5 seconds). I have tried to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Looks like it is related to zombie check. Call stack: {code} Call stack: File "/usr/local/bin/airflow", line 32, in args.func(args) File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in scheduler job.run() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in run self._execute() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in _execute self._execute_helper() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in _execute_helper self.processor_agent.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 511, in start self._async_mode) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 565, in _launch_process p.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start self._popen = self._Popen(self) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch code = process_obj._bootstrap() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 560, in helper processor_manager.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 797, in start self.start_in_async() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 820, in start_in_async simple_dags = self.heartbeat() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1190, in heartbeat zombies = self._find_zombies() File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1236, in _find_zombies LJ.latest_heartbeat < limit_dttm, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in all return list(self) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in _execute_and_instances querycontext, self._connection_from_session, close_with_result=True File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in _get_bind_args mapper=self._bind_mapper(), clause=querycontext.statement, **kw File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in _connection_from_session conn = self.session.connection(**kw) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, in connection execution_options=execution_options, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, in _connection_for_bind engine, execution_options File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 424, in _connection_for_bind conn = bind.contextual_connect() File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2194, in contextual_connect **kwargs File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 125, in __init__ self.dispatch.engine_connect(self, self.__branch) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 297, in __call__ fn(*args, **kw) File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 79, in ping_connection log.warning("DB connection invalidated. Reconnecting...", err) Message: 'DB connection
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" every JOB_HEARTBEAT_SEC
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Summary: "DB connection invalidated" every JOB_HEARTBEAT_SEC (was: "DB connection invalidated" polluting scheduler log) > "DB connection invalidated" every JOB_HEARTBEAT_SEC > --- > > Key: AIRFLOW-4134 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4134 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10.2 >Reporter: Daniel Standish >Priority: Major > > I am finding with 1.10.2 that I seem to get a warning {{DB connection > invalidated. Reconnecting...}} very frequently. It seems to coincide closely > with my job_heartbeat_sec parameter (5 seconds). > I have tried to diagnose I added logging of the triggering error on line 79 > in airflow/utils/sqlalchemy.py, from which this warning is generated. > Looks like it is related to zombie check. > Call stack: > {code} > Call stack: > File "/usr/local/bin/airflow", line 32, in > args.func(args) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line > 74, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, > in scheduler > job.run() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in > run > self._execute() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, > in _execute > self._execute_helper() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, > in _execute_helper > self.processor_agent.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 511, in start > self._async_mode) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 565, in _launch_process > p.start() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in > start > self._popen = self._Popen(self) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in > _Popen > return _default_context.get_context().Process._Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in > _Popen > return Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in > __init__ > self._launch(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in > _launch > code = process_obj._bootstrap() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in > _bootstrap > self.run() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run > self._target(*self._args, **self._kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 560, in helper > processor_manager.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 797, in start > self.start_in_async() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 820, in start_in_async > simple_dags = self.heartbeat() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1190, in heartbeat > zombies = self._find_zombies() > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, > in wrapper > return func(*args, **kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1236, in _find_zombies > LJ.latest_heartbeat < limit_dttm, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 2925, in all > return list(self) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3081, in __iter__ > return self._execute_and_instances(context) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3103, in _execute_and_instances > querycontext, self._connection_from_session, close_with_result=True > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3111, in _get_bind_args > mapper=self._bind_mapper(), clause=querycontext.statement, **kw > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3096, in _connection_from_session > conn = self.session.connection(**kw) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1120, in connection > execution_options=execution_options, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1126, in _connection_for_bind > engine, execution_options > File
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Call stack: {code} Call stack: File "/usr/local/bin/airflow", line 32, in args.func(args) File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in scheduler job.run() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in run self._execute() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in _execute self._execute_helper() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in _execute_helper self.processor_agent.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 511, in start self._async_mode) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 565, in _launch_process p.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start self._popen = self._Popen(self) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch code = process_obj._bootstrap() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 560, in helper processor_manager.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 797, in start self.start_in_async() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 820, in start_in_async simple_dags = self.heartbeat() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1190, in heartbeat zombies = self._find_zombies() File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1236, in _find_zombies LJ.latest_heartbeat < limit_dttm, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in all return list(self) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in _execute_and_instances querycontext, self._connection_from_session, close_with_result=True File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in _get_bind_args mapper=self._bind_mapper(), clause=querycontext.statement, **kw File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in _connection_from_session conn = self.session.connection(**kw) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, in connection execution_options=execution_options, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, in _connection_for_bind engine, execution_options File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 424, in _connection_for_bind conn = bind.contextual_connect() File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2194, in contextual_connect **kwargs File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 125, in __init__ self.dispatch.engine_connect(self, self.__branch) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 297, in __call__ fn(*args, **kw) File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 79, in ping_connection log.warning("DB connection invalidated. Reconnecting...", err) Message: 'DB connection invalidated. Reconnecting...' Arguments: (OperationalError('(psycopg2.OperationalError) server closed the connection
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Call stack: {code} Call stack: File "/usr/local/bin/airflow", line 32, in args.func(args) File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in scheduler job.run() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in run self._execute() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in _execute self._execute_helper() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in _execute_helper self.processor_agent.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 511, in start self._async_mode) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 565, in _launch_process p.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start self._popen = self._Popen(self) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch code = process_obj._bootstrap() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 560, in helper processor_manager.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 797, in start self.start_in_async() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 820, in start_in_async simple_dags = self.heartbeat() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1190, in heartbeat zombies = self._find_zombies() File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1236, in _find_zombies LJ.latest_heartbeat < limit_dttm, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in all return list(self) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in _execute_and_instances querycontext, self._connection_from_session, close_with_result=True File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in _get_bind_args mapper=self._bind_mapper(), clause=querycontext.statement, **kw File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in _connection_from_session conn = self.session.connection(**kw) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, in connection execution_options=execution_options, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, in _connection_for_bind engine, execution_options File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 424, in _connection_for_bind conn = bind.contextual_connect() File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2194, in contextual_connect **kwargs File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 125, in __init__ self.dispatch.engine_connect(self, self.__branch) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 297, in __call__ fn(*args, **kw) File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 79, in ping_connection log.warning("DB connection invalidated. Reconnecting...", err) Message: 'DB connection invalidated. Reconnecting...' Arguments: (OperationalError('(psycopg2.OperationalError) server closed the connection
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Call stack: {code} webserver_1 | Call stack: webserver_1 | File "/usr/local/bin/airflow", line 32, in webserver_1 | args.func(args) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper webserver_1 | return f(*args, **kwargs) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in scheduler webserver_1 | job.run() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in run webserver_1 | self._execute() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in _execute webserver_1 | self._execute_helper() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in _execute_helper webserver_1 | self.processor_agent.start() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 511, in start webserver_1 | self._async_mode) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 565, in _launch_process webserver_1 | p.start() webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start webserver_1 | self._popen = self._Popen(self) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen webserver_1 | return _default_context.get_context().Process._Popen(process_obj) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen webserver_1 | return Popen(process_obj) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ webserver_1 | self._launch(process_obj) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch webserver_1 | code = process_obj._bootstrap() webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap webserver_1 | self.run() webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run webserver_1 | self._target(*self._args, **self._kwargs) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 560, in helper webserver_1 | processor_manager.start() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 797, in start webserver_1 | self.start_in_async() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 820, in start_in_async webserver_1 | simple_dags = self.heartbeat() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1190, in heartbeat webserver_1 | zombies = self._find_zombies() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper webserver_1 | return func(*args, **kwargs) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1236, in _find_zombies webserver_1 | LJ.latest_heartbeat < limit_dttm, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in all webserver_1 | return list(self) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in __iter__ webserver_1 | return self._execute_and_instances(context) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in _execute_and_instances webserver_1 | querycontext, self._connection_from_session, close_with_result=True webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in _get_bind_args webserver_1 | mapper=self._bind_mapper(), clause=querycontext.statement, **kw webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in _connection_from_session webserver_1 | conn = self.session.connection(**kw) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, in connection webserver_1 | execution_options=execution_options, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, in _connection_for_bind webserver_1 | engine,
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Here's the traceback: {code} webserver_1 | Traceback (most recent call last): webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection webserver_1 | connection.scalar(select([1])) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar webserver_1 | return self.execute(object_, *multiparams, **params).scalar() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute webserver_1 | return meth(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection webserver_1 | return connection._execute_clauseelement(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement webserver_1 | distilled_params, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context webserver_1 | e, statement, parameters, cursor, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause webserver_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, in reraise webserver_1 | raise value.with_traceback(tb) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context webserver_1 | cursor, statement, parameters, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute webserver_1 | cursor.execute(statement, parameters) webserver_1 | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly webserver_1 | This probably means the server terminated abnormally webserver_1 | before or while processing the request. webserver_1 | [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) {code} It has something to do with the configure_orm function in airflow/settings.py, because that is the only usage of setup_event_handlers (from airflow/utils/sqlalchemy.py). And if I disable connection pooling, then the warning seems to go away. Beyond that, I am not sure where to go from here. But something must be wrong. was: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Here's the traceback: {code} webserver_1 | Traceback (most recent call last): webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection webserver_1 | connection.scalar(select([1])) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar webserver_1 | return self.execute(object_, *multiparams, **params).scalar() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute webserver_1 | return meth(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection webserver_1 | return connection._execute_clauseelement(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement webserver_1 | distilled_params, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context webserver_1 | e, statement, parameters, cursor, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) webserver_1 | File
[jira] [Updated] (AIRFLOW-4134) DB connection invalidated warning at every zombie check
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Here's the traceback: {code} webserver_1 | Traceback (most recent call last): webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection webserver_1 | connection.scalar(select([1])) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar webserver_1 | return self.execute(object_, *multiparams, **params).scalar() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute webserver_1 | return meth(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection webserver_1 | return connection._execute_clauseelement(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement webserver_1 | distilled_params, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context webserver_1 | e, statement, parameters, cursor, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause webserver_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, in reraise webserver_1 | raise value.with_traceback(tb) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context webserver_1 | cursor, statement, parameters, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute webserver_1 | cursor.execute(statement, parameters) webserver_1 | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly webserver_1 | This probably means the server terminated abnormally webserver_1 | before or while processing the request. webserver_1 | [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) {code} It has something to do with the configure_orm function in airflow/settings.py, because that is the only usage of setup_event_handlers (from airflow/utils/sqlalchemy.py). I am not sure where to go from here. But something must be wrong. was: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Here's the traceback: {code} webserver_1 | Traceback (most recent call last): webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection webserver_1 | connection.scalar(select([1])) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar webserver_1 | return self.execute(object_, *multiparams, **params).scalar() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute webserver_1 | return meth(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection webserver_1 | return connection._execute_clauseelement(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement webserver_1 | distilled_params, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context webserver_1 | e, statement, parameters, cursor, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause webserver_1 |
[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check
[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4134: - Summary: "DB connection invalidated" warning at every zombie check (was: DB connection invalidated warning at every zombie check) > "DB connection invalidated" warning at every zombie check > - > > Key: AIRFLOW-4134 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4134 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10.2 >Reporter: Daniel Standish >Priority: Major > > I am finding with 1.10.2 that I seem to get a warning {{DB connection > invalidated. Reconnecting...}} very frequently. > I to try to diagnose I added logging of the triggering error on line 79 in > airflow/utils/sqlalchemy.py, from which this warning is generated. > Here's the traceback: > {code} > webserver_1 | Traceback (most recent call last): > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line > 68, in ping_connection > webserver_1 | connection.scalar(select([1])) > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, > in scalar > webserver_1 | return self.execute(object_, *multiparams, > **params).scalar() > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, > in execute > webserver_1 | return meth(self, multiparams, params) > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line > 273, in _execute_on_connection > webserver_1 | return connection._execute_clauseelement(self, > multiparams, params) > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line > 1099, in _execute_clauseelement > webserver_1 | distilled_params, > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line > 1240, in _execute_context > webserver_1 | e, statement, parameters, cursor, context > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line > 1458, in _handle_dbapi_exception > webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, > in raise_from_cause > webserver_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause) > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, > in reraise > webserver_1 | raise value.with_traceback(tb) > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line > 1236, in _execute_context > webserver_1 | cursor, statement, parameters, context > webserver_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line > 536, in do_execute > webserver_1 | cursor.execute(statement, parameters) > webserver_1 | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) > server closed the connection unexpectedly > webserver_1 |This probably means the server terminated abnormally > webserver_1 |before or while processing the request. > webserver_1 | [SQL: 'SELECT 1'] (Background on this error at: > http://sqlalche.me/e/e3q8) > {code} > It has something to do with the configure_orm function in > airflow/settings.py, because that is the only usage of setup_event_handlers > (from airflow/utils/sqlalchemy.py). > I am not sure where to go from here. But something must be wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-4134) DB connection invalidated warning at every zombie check
Daniel Standish created AIRFLOW-4134: Summary: DB connection invalidated warning at every zombie check Key: AIRFLOW-4134 URL: https://issues.apache.org/jira/browse/AIRFLOW-4134 Project: Apache Airflow Issue Type: Bug Affects Versions: 1.10.2 Reporter: Daniel Standish I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Here's the traceback: {code} webserver_1 | Traceback (most recent call last): webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection webserver_1 | connection.scalar(select([1])) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar webserver_1 | return self.execute(object_, *multiparams, **params).scalar() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute webserver_1 | return meth(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection webserver_1 | return connection._execute_clauseelement(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement webserver_1 | distilled_params, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context webserver_1 | e, statement, parameters, cursor, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause webserver_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, in reraise webserver_1 | raise value.with_traceback(tb) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context webserver_1 | cursor, statement, parameters, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute webserver_1 | cursor.execute(statement, parameters) webserver_1 | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly webserver_1 | This probably means the server terminated abnormally webserver_1 | before or while processing the request. webserver_1 | [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) {code} I am not sure where to go from here. But something must be wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval
[ https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish closed AIRFLOW-4056. Resolution: Invalid > Dag file processing does not respect dag_dir_list_interval > -- > > Key: AIRFLOW-4056 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4056 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.2 > Environment: I have confirmed this issue on mac and centos > environments, using mysql backend. >Reporter: Daniel Standish >Priority: Major > > The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag > directory scanning. > It seems to happen every 2 seconds, no matter what. The default is supposed > to be 5 minutes. > As a result I see a scheduler output like this: > {code:java} > [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection > invalidated. Reconnecting... > [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection > invalidated. Reconnecting... > [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing > results{code} > And no more is there the periodic printing of dag stats, like there was in > 1.10.1. > I can confirm that this is happening by adding this to something in dag > folder: > {code:python} > with open(Path('~/temp/test.log').expanduser(), 'at') as f: > f.write(f"{datetime.now()}: i am imported\n") > {code} > Here is some scheduler output with debug log level: > {code} > _ > |__( )_ __/__ / __ > /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / > ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / > _/_/ |_/_/ /_//_//_/ \//|__/ > [2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler > [2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 > seconds > [2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most > -1 times > [2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in > /Users/dstandish/code/python_tfgetl/tfgetl/dags > [2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in > /Users/dstandish/code/python_tfgetl/tfgetl/dags > [2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for > active dag runs > [2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched > DagFileProcessorManager with pid: 57333 > [2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop... > [2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results > [2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor > [2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task > instances > [2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue > [2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots > [2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the 'airflow.executors.local_executor.LocalExecutor'> sync method > [2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 > seconds > [2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds > [2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone > > [2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load > custom logging, using default config instead > [2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection
[jira] [Resolved] (AIRFLOW-3901) Add optional role parameter to snowflake hook
[ https://issues.apache.org/jira/browse/AIRFLOW-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish resolved AIRFLOW-3901. -- Resolution: Fixed Fix Version/s: (was: 1.10.3) > Add optional role parameter to snowflake hook > - > > Key: AIRFLOW-3901 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3901 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Trivial > Original Estimate: 1h > Remaining Estimate: 1h > > Role is a parameter missing from snowflake hook. > I will add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval
[ https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4056: - Description: The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag directory scanning. It seems to happen every 2 seconds, no matter what. The default is supposed to be 5 minutes. As a result I see a scheduler output like this: {code:java} [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing results{code} And no more is there the periodic printing of dag stats, like there was in 1.10.1. I can confirm that this is happening by adding this to something in dag folder: {code:python} with open(Path('~/temp/test.log').expanduser(), 'at') as f: f.write(f"{datetime.now()}: i am imported\n") {code} Here is some scheduler output with debug log level: {code} _ |__( )_ __/__ / __ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_//_//_/ \//|__/ [2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler [2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 seconds [2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most -1 times [2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in /Users/dstandish/code/python_tfgetl/tfgetl/dags [2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in /Users/dstandish/code/python_tfgetl/tfgetl/dags [2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for active dag runs [2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched DagFileProcessorManager with pid: 57333 [2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop... [2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor [2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task instances [2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue [2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots [2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the sync method [2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 seconds [2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds [2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone [2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load custom logging, using default config instead [2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection pool (PID 57333) [2019-03-09 17:20:59,144] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=57333 [2019-03-09 17:20:59,256] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57334) [2019-03-09 17:20:59,267] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57337) [2019-03-09 17:20:59,610] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57336) [2019-03-09 17:20:59,752] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57335) [2019-03-09 17:21:00,130] {jobs.py:1627} DEBUG - Sleeping for 0.99 seconds to prevent excessive logging [2019-03-09 17:21:00,156] {jobs.py:489} DEBUG - Waiting for [2019-03-09 17:21:00,161]
[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval
[ https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4056: - Description: The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag directory scanning. It seems to happen every 2 seconds, no matter what. The default is supposed to be 5 minutes. As a result I see a scheduler output like this: {code:java} [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing results{code} And no more is there the periodic printing of dag stats, like there was in 1.10.1. I can confirm that this is happening by adding this to something in dag folder: {code:python} with open(Path('~/temp/test.log').expanduser(), 'at') as f: f.write(f"{datetime.now()}: i am imported\n") {code} Here is some scheduler output with debug log level: {code} _ |__( )_ __/__ / __ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_//_//_/ \//|__/ [2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler [2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 seconds [2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most -1 times [2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in /Users/dstandish/code/python_tfgetl/tfgetl/dags [2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in /Users/dstandish/code/python_tfgetl/tfgetl/dags [2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for active dag runs [2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched DagFileProcessorManager with pid: 57333 [2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop... [2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor [2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task instances [2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue [2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots [2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the sync method [2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 seconds [2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds [2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone [2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load custom logging, using default config instead [2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection pool (PID 57333) [2019-03-09 17:20:59,144] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=57333 [2019-03-09 17:20:59,256] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57334) [2019-03-09 17:20:59,267] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57337) [2019-03-09 17:20:59,610] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57336) [2019-03-09 17:20:59,752] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57335) [2019-03-09 17:21:00,130] {jobs.py:1627} DEBUG - Sleeping for 0.99 seconds to prevent excessive logging [2019-03-09 17:21:00,156] {jobs.py:489} DEBUG - Waiting for [2019-03-09 17:21:00,161]
[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval
[ https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4056: - Description: The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag directory scanning. It seems to happen every 2 seconds, no matter what. The default is supposed to be 5 minutes. As a result I see a scheduler output like this: {code:java} [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing results{code} And no more is there the periodic printing of dag stats, like there was in 1.10.1. I can confirm that this is happening by adding this to something in dag folder: {code:python} with open(Path('~/temp/test.log').expanduser(), 'at') as f: f.write(f"{datetime.now()}: i am imported\n") {code} Here is some scheduler output with debug log level: {code} _ |__( )_ __/__ / __ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_//_//_/ \//|__/ [2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler [2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 seconds [2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most -1 times [2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in /Users/dstandish/code/python_tfgetl/tfgetl/dags [2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in /Users/dstandish/code/python_tfgetl/tfgetl/dags [2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for active dag runs [2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched DagFileProcessorManager with pid: 57333 [2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop... [2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor [2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task instances [2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue [2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots [2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the sync method [2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 seconds [2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds [2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone [2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load custom logging, using default config instead [2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection pool (PID 57333) [2019-03-09 17:20:59,144] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=57333 [2019-03-09 17:20:59,256] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57334) [2019-03-09 17:20:59,267] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57337) [2019-03-09 17:20:59,610] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57336) [2019-03-09 17:20:59,752] {settings.py:201} DEBUG - Disposing DB connection pool (PID 57335) [2019-03-09 17:21:00,130] {jobs.py:1627} DEBUG - Sleeping for 0.99 seconds to prevent excessive logging [2019-03-09 17:21:00,156] {jobs.py:489} DEBUG - Waiting for [2019-03-09 17:21:00,161]
[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval
[ https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-4056: - Description: The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag directory scanning. It seems to happen every 2 seconds, no matter what. The default is supposed to be 5 minutes. As a result I see a scheduler output like this: {code:java} [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing results{code} And no more is there the periodic printing of dag stats, like there was in 1.10.1. I can confirm that this is happening by adding this to something in dag folder: {code:java} with open(Path('~/temp/test.log').expanduser(), 'at') as f: f.write(f"{datetime.now()}: i am imported\n") {code} was: The conf parameter dag_dir_list_interval seems to have no effect on dag directory scanning. It seems to happen every 2 seconds, no matter what. The default is supposed to be 5 minutes. As a result I see a scheduler output like this: {code:java} [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing results{code} And no more is there the periodic printing of dag stats, like there was in 1.10.1. I can confirm that this is happening by adding this to something in dag folder: {code:java} with open(Path('~/temp/test.log').expanduser(), 'at') as f: f.write(f"{datetime.now()}: i am imported\n") {code} > Dag file processing does not respect dag_dir_list_interval > -- > > Key: AIRFLOW-4056 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4056 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.2 > Environment: I have confirmed this issue on mac and centos > environments, using mysql backend. >Reporter: Daniel Standish >Priority: Major > > The conf parameter
[jira] [Created] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval
Daniel Standish created AIRFLOW-4056: Summary: Dag file processing does not respect dag_dir_list_interval Key: AIRFLOW-4056 URL: https://issues.apache.org/jira/browse/AIRFLOW-4056 Project: Apache Airflow Issue Type: Bug Components: scheduler Affects Versions: 1.10.2 Environment: I have confirmed this issue on mac and centos environments, using mysql backend. Reporter: Daniel Standish The conf parameter dag_dir_list_interval seems to have no effect on dag directory scanning. It seems to happen every 2 seconds, no matter what. The default is supposed to be 5 minutes. As a result I see a scheduler output like this: {code:java} [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection invalidated. Reconnecting... [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing results{code} And no more is there the periodic printing of dag stats, like there was in 1.10.1. I can confirm that this is happening by adding this to something in dag folder: {code:java} with open(Path('~/temp/test.log').expanduser(), 'at') as f: f.write(f"{datetime.now()}: i am imported\n") {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3901) Add optional role parameter to snowflake hook
[ https://issues.apache.org/jira/browse/AIRFLOW-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Standish updated AIRFLOW-3901: - Fix Version/s: 1.10.3 > Add optional role parameter to snowflake hook > - > > Key: AIRFLOW-3901 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3901 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Daniel Standish >Assignee: Daniel Standish >Priority: Trivial > Fix For: 1.10.3 > > Original Estimate: 1h > Remaining Estimate: 1h > > Role is a parameter missing from snowflake hook. > I will add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3901) Add optional role parameter to snowflake hook
Daniel Standish created AIRFLOW-3901: Summary: Add optional role parameter to snowflake hook Key: AIRFLOW-3901 URL: https://issues.apache.org/jira/browse/AIRFLOW-3901 Project: Apache Airflow Issue Type: Improvement Components: contrib Reporter: Daniel Standish Assignee: Daniel Standish Role is a parameter missing from snowflake hook. I will add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)