Repository: incubator-airflow Updated Branches: refs/heads/master 2fef9152b -> 1bde78338
[AIRFLOW-1779] Add keepalive packets to ssh hook Make use of paramiko's set_keepalive method to send keepalive packets every keepalive_interval seconds. This will prevent long running queries with no terminal output from being termanated as idle, for example by an intermediate NAT. Set on by default with a 30 second interval. Closes #2749 from RJKeevil/add-sshhook-keepalive Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1bde7833 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1bde7833 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1bde7833 Branch: refs/heads/master Commit: 1bde7833854210676dcd6e8da8b6d9567e12031a Parents: 2fef915 Author: Rob Keevil <robkee...@gmail.com> Authored: Thu Nov 2 18:57:09 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Thu Nov 2 18:57:09 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/ssh_hook.py | 9 ++++++++- tests/contrib/hooks/test_ssh_hook.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1bde7833/airflow/contrib/hooks/ssh_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py index b061fd7..a85911b 100755 --- a/airflow/contrib/hooks/ssh_hook.py +++ b/airflow/contrib/hooks/ssh_hook.py @@ -46,6 +46,8 @@ class SSHHook(BaseHook, LoggingMixin): :type key_file: str :param timeout: timeout for the attempt to connect to the remote_host. :type timeout: int + :param keepalive_interval: send a keepalive packet to remote host every keepalive_interval seconds + :type keepalive_interval: int """ def __init__(self, @@ -54,7 +56,8 @@ class SSHHook(BaseHook, LoggingMixin): username=None, password=None, key_file=None, - timeout=10 + timeout=10, + keepalive_interval=30 ): super(SSHHook, self).__init__(ssh_conn_id) self.ssh_conn_id = ssh_conn_id @@ -63,6 +66,7 @@ class SSHHook(BaseHook, LoggingMixin): self.password = password self.key_file = key_file self.timeout = timeout + self.keepalive_interval = keepalive_interval # Default values, overridable from Connection self.compress = True self.no_host_key_check = True @@ -140,6 +144,9 @@ class SSHHook(BaseHook, LoggingMixin): compress=self.compress, sock=host_proxy) + if self.keepalive_interval: + client.get_transport().set_keepalive(self.keepalive_interval) + self.client = client except paramiko.AuthenticationException as auth_error: self.log.error( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1bde7833/tests/contrib/hooks/test_ssh_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_ssh_hook.py b/tests/contrib/hooks/test_ssh_hook.py index a556332..6f35431 100644 --- a/tests/contrib/hooks/test_ssh_hook.py +++ b/tests/contrib/hooks/test_ssh_hook.py @@ -33,7 +33,7 @@ class SSHHookTest(unittest.TestCase): def setUp(self): configuration.load_test_config() from airflow.contrib.hooks.ssh_hook import SSHHook - self.hook = SSHHook(ssh_conn_id='ssh_default') + self.hook = SSHHook(ssh_conn_id='ssh_default', keepalive_interval=10) self.hook.no_host_key_check = True def test_ssh_connection(self):