This is an automated email from the ASF dual-hosted git repository.
dabla pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a6e6d04e709 Fixed output encoding in WinRMTrigger for WinRMOperator in
deferred mode (#64154)
a6e6d04e709 is described below
commit a6e6d04e70922c3ce756ca12757c73385b072f59
Author: David Blain <[email protected]>
AuthorDate: Thu Mar 26 10:56:09 2026 +0100
Fixed output encoding in WinRMTrigger for WinRMOperator in deferred mode
(#64154)
* refactor: Fixed output encoding with WinRMTrigger
---
.../providers/microsoft/winrm/operators/winrm.py | 28 +++++++++--
.../providers/microsoft/winrm/triggers/winrm.py | 13 +++---
.../unit/microsoft/winrm/operators/test_winrm.py | 54 ++++++++++++++++++++++
.../unit/microsoft/winrm/triggers/test_winrm.py | 2 -
4 files changed, 86 insertions(+), 11 deletions(-)
diff --git
a/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
b/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
index c71e7eb6fca..0253cb95c32 100644
---
a/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
+++
b/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
@@ -142,7 +142,6 @@ class WinRMOperator(BaseOperator):
ssh_conn_id=self.hook.ssh_conn_id,
shell_id=shell_id,
command_id=command_id,
- output_encoding=self.output_encoding,
return_output=self.do_xcom_push,
max_output_chunks=self.max_output_chunks,
poll_interval=self.poll_interval,
@@ -185,14 +184,37 @@ class WinRMOperator(BaseOperator):
if enable_pickling:
return stdout_buffer
- return
b64encode(b"".join(stdout_buffer)).decode(self.output_encoding)
+ # Base64-encoded text is ASCII. Decode using ASCII to avoid
+ # producing non-ASCII characters when users set output_encoding
+ # to encodings like UTF-16.
+ return b64encode(b"".join(stdout_buffer)).decode("ascii")
stderr_output = b"".join(stderr_buffer).decode(self.output_encoding)
error_msg = f"Error running cmd: {self.command}, return code:
{return_code}, error: {stderr_output}"
raise AirflowException(error_msg)
def _decode(self, output: str) -> bytes:
- decoded_output = base64.standard_b64decode(output)
+ # The trigger emits base64 text which MUST be ASCII. Encode to ASCII
+ # bytes before decoding. Provide a clear error if the string contains
+ # non-ASCII characters so users can diagnose mismatched encodings.
+ try:
+ if isinstance(output, str):
+ output_bytes = output.encode("ascii")
+ else:
+ output_bytes = output
+ except UnicodeEncodeError as e:
+ raise AirflowException(
+ "Failed to decode base64 output from trigger: base64 text
contains non-ASCII characters. "
+ "This likely means the trigger encoded the base64 text using
the wrong text encoding."
+ ) from e
+
+ try:
+ decoded_output = base64.standard_b64decode(output_bytes)
+ except Exception as e:
+ raise AirflowException(
+ f"Failed to base64-decode trigger output: {e}. Ensure the
trigger returns ASCII base64 strings."
+ ) from e
+
self.hook.log_output(decoded_output,
output_encoding=self.output_encoding)
return decoded_output
diff --git
a/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/triggers/winrm.py
b/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/triggers/winrm.py
index 61bb37b7d32..3934773736e 100644
---
a/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/triggers/winrm.py
+++
b/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/triggers/winrm.py
@@ -49,7 +49,6 @@ class WinRMCommandOutputTrigger(BaseTrigger):
though priority is given to the params passed during init.
:param shell_id: The shell id on the remote machine.
:param command_id: The command id executed on the remote machine.
- :param output_encoding: the encoding used to decode stout and stderr,
defaults to utf-8.
:param return_output: Whether to accumulate and return the stdout or not,
defaults to True.
:param poll_interval: How often, in seconds, the trigger should poll the
output command of the launched command,
defaults to 1.
@@ -62,7 +61,6 @@ class WinRMCommandOutputTrigger(BaseTrigger):
ssh_conn_id: str,
shell_id: str,
command_id: str,
- output_encoding: str = "utf-8",
return_output: bool = True,
poll_interval: float = 1,
max_output_chunks: int = 100,
@@ -71,7 +69,6 @@ class WinRMCommandOutputTrigger(BaseTrigger):
self.ssh_conn_id = ssh_conn_id
self.shell_id = shell_id
self.command_id = command_id
- self.output_encoding = output_encoding
self.return_output = return_output
self.poll_interval = poll_interval
self._stdout: deque[str] = deque(maxlen=max_output_chunks)
@@ -85,7 +82,6 @@ class WinRMCommandOutputTrigger(BaseTrigger):
"ssh_conn_id": self.ssh_conn_id,
"shell_id": self.shell_id,
"command_id": self.command_id,
- "output_encoding": self.output_encoding,
"return_output": self.return_output,
"poll_interval": self.poll_interval,
"max_output_chunks": self._stdout.maxlen,
@@ -115,9 +111,14 @@ class WinRMCommandOutputTrigger(BaseTrigger):
) = await self.get_command_output(conn)
if self.return_output and stdout:
-
self._stdout.append(base64.standard_b64encode(stdout).decode(self.output_encoding))
+ # Base64 output is ASCII text regardless of the command
output encoding.
+ # Decode using ASCII to avoid producing non-ASCII strings
when a
+ # different output_encoding (e.g. UTF-16) is used for
decoding
+ # the original command bytes.
+
self._stdout.append(base64.standard_b64encode(stdout).decode("ascii"))
if stderr:
-
self._stderr.append(base64.standard_b64encode(stderr).decode(self.output_encoding))
+ # Same reasoning for stderr as for stdout.
+
self._stderr.append(base64.standard_b64encode(stderr).decode("ascii"))
if not command_done:
await asyncio.sleep(self.poll_interval)
diff --git
a/providers/microsoft/winrm/tests/unit/microsoft/winrm/operators/test_winrm.py
b/providers/microsoft/winrm/tests/unit/microsoft/winrm/operators/test_winrm.py
index 3544c499ebc..3054b59e27d 100644
---
a/providers/microsoft/winrm/tests/unit/microsoft/winrm/operators/test_winrm.py
+++
b/providers/microsoft/winrm/tests/unit/microsoft/winrm/operators/test_winrm.py
@@ -142,3 +142,57 @@ class TestWinRMOperator:
"stdout": ["aGVsbG8="],
}
assert result == "aGVsbG8="
+
+ @mock.patch("airflow.providers.microsoft.winrm.operators.winrm.WinRMHook")
+ @mock.patch("airflow.providers.microsoft.winrm.triggers.winrm.WinRMHook")
+ def test_execute_deferrable_with_utf16le_output(self, mock_operator_hook,
mock_trigger_hook):
+ # Simulate a command that produces UTF-16-LE encoded output (e.g.,
PowerShell)
+ mock_hook_instance = MagicMock(spec=WinRMHook)
+ mock_hook_instance.ssh_conn_id = "winrm_default"
+ mock_operator_hook.return_value = mock_hook_instance
+ mock_trigger_hook.return_value = mock_hook_instance
+ mock_conn = MagicMock()
+ mock_hook_instance.get_async_conn = AsyncMock(return_value=mock_conn)
+ mock_hook_instance.get_conn.return_value = mock_conn
+
+ original_text = "Hello, 世界"
+ original_bytes = original_text.encode("utf-16-le")
+ mock_hook_instance.get_command_output.return_value = (original_bytes,
b"", 0, True)
+ mock_hook_instance.run_command.return_value = (
+ "043E496C-A9E5-4284-AFCC-78A90E2BCB65",
+ "E4C36903-E59F-43AB-9374-ABA87509F46D",
+ )
+
+ operator = WinRMOperator(
+ task_id="test_task",
+ winrm_hook=mock_hook_instance,
+ command="dir",
+ deferrable=True,
+ output_encoding="utf-16-le",
+ )
+
+ result, events = execute_operator(operator)
+
+ assert len(events) == 1
+ assert isinstance(events[0], TriggerEvent)
+
+ # Expect the trigger payload to contain ASCII base64 of the raw bytes
+ expected_b64 = b64encode(original_bytes).decode("ascii")
+ assert events[0].payload == {
+ "command_id": "E4C36903-E59F-43AB-9374-ABA87509F46D",
+ "return_code": 0,
+ "shell_id": "043E496C-A9E5-4284-AFCC-78A90E2BCB65",
+ "status": "success",
+ "stderr": [],
+ "stdout": [expected_b64],
+ }
+
+ # Operator should return the same ASCII base64 string
+ assert result == expected_b64
+
+ # Ensure the hook's log_output was called with the decoded bytes and
the configured encoding
+ assert mock_hook_instance.log_output.called
+ logged_args, logged_kwargs = mock_hook_instance.log_output.call_args
+ assert logged_args[0] == original_bytes
+ assert logged_args[0].decode("utf-16-le") == original_text
+ assert logged_kwargs.get("output_encoding") == "utf-16-le"
diff --git
a/providers/microsoft/winrm/tests/unit/microsoft/winrm/triggers/test_winrm.py
b/providers/microsoft/winrm/tests/unit/microsoft/winrm/triggers/test_winrm.py
index 511725e6867..869f23443c7 100644
---
a/providers/microsoft/winrm/tests/unit/microsoft/winrm/triggers/test_winrm.py
+++
b/providers/microsoft/winrm/tests/unit/microsoft/winrm/triggers/test_winrm.py
@@ -32,7 +32,6 @@ class TestWinRMCommandOutputTrigger:
ssh_conn_id="ssh_conn_id",
shell_id="043E496C-A9E5-4284-AFCC-78A90E2BCB65",
command_id="E4C36903-E59F-43AB-9374-ABA87509F46D",
- output_encoding="utf-8",
return_output=True,
poll_interval=10,
max_output_chunks=100,
@@ -46,7 +45,6 @@ class TestWinRMCommandOutputTrigger:
"ssh_conn_id": "ssh_conn_id",
"shell_id": "043E496C-A9E5-4284-AFCC-78A90E2BCB65",
"command_id": "E4C36903-E59F-43AB-9374-ABA87509F46D",
- "output_encoding": "utf-8",
"return_output": True,
"poll_interval": 10,
"max_output_chunks": 100,