Repository: incubator-airflow Updated Branches: refs/heads/master 313f5bac4 -> 1943a96e7
[AIRFLOW-1789][AIRFLOW-1712] Log SSHOperator stderr to log.warning Logging functionality for SSHOperator was added in [AIRFLOW-1712] but it only logged stdout. This commit also logs stderr to log.warning Closes #2761 from OpringaoDoTurno/stderr_in_ssh Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1943a96e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1943a96e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1943a96e Branch: refs/heads/master Commit: 1943a96e708dd68a6990b022ffbbe3729a8c27b8 Parents: 313f5ba Author: Ignasi Peiró <[email protected]> Authored: Wed Nov 8 19:38:39 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Wed Nov 8 19:38:39 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/ssh_operator.py | 48 +++++++++++++++++++++----- 1 file changed, 39 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1943a96e/airflow/contrib/operators/ssh_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py index bb72330..fbbf86c 100644 --- a/airflow/contrib/operators/ssh_operator.py +++ b/airflow/contrib/operators/ssh_operator.py @@ -13,6 +13,7 @@ # limitations under the License. from base64 import b64encode +from select import select from airflow import configuration from airflow.contrib.hooks.ssh_hook import SSHHook @@ -86,26 +87,55 @@ class SSHOperator(BaseOperator): get_pty=get_pty, timeout=self.timeout ) + # get channels + channel = stdout.channel + + # closing stdin stdin.close() - output=b'' - for line in stdout: - output+=line.encode('utf-8') - self.log.info(line.strip('\n')) + channel.shutdown_write() + + agg_stdout=b'' + agg_stderr=b'' + + agg_stdout+=stdout.channel.recv(len(stdout.channel.in_buffer)) + # read from both stdout and stderr + while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): + readq, _, _ = select([channel], [], [], self.timeout) + for c in readq: + if c.recv_ready(): + line = stdout.channel.recv(len(c.in_buffer)) + line = line + agg_stdout+=line + self.log.info(line.decode('utf-8').strip('\n')) + if c.recv_stderr_ready(): + line = stderr.channel.recv_stderr(len(c.in_stderr_buffer)) + line = line + agg_stderr+=line + self.log.warning(line.decode('utf-8').strip('\n')) + if stdout.channel.exit_status_ready() \ + and not stderr.channel.recv_stderr_ready() \ + and not stdout.channel.recv_ready(): + + stdout.channel.shutdown_read() + stdout.channel.close() + break + + stdout.close() + stderr.close() exit_status = stdout.channel.recv_exit_status() if exit_status is 0: - # only returning on output if do_xcom_push is set - # otherwise its not suppose to be disclosed + # returning output if do_xcom_push is set if self.do_xcom_push: enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') if enable_pickling: - return output + return agg_stdout else: - return b64encode(output).decode('utf-8') + return b64encode(agg_stdout).decode('utf-8') else: - error_msg = stderr.read() + error_msg = agg_stderr.decode('utf-8') raise AirflowException("error running cmd: {0}, error: {1}" .format(self.command, error_msg))
