This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new b1d31df Fix: Mitigate CRLF injection in email headers (Issue #603)
b1d31df is described below
commit b1d31df619d01ebf48a0cca1ff378ae0d10f5381
Author: Abhishek Mishra <[email protected]>
AuthorDate: Thu Jan 29 19:38:25 2026 +0000
Fix: Mitigate CRLF injection in email headers (Issue #603)
- Use EmailMessage with policy.SMTPUTF8 for proper Unicode handling
- Use Address objects for From/To headers to prevent CRLF injection
- Catch ValueError from header assignment to detect injection attempts
- Change import to 'message' module to avoid naming conflicts
- Rename function parameter to 'msg_data', use 'msg' for EmailMessage
- Set Date header to UTC/GMT using formatdate(usegmt=True)
- Use bytes(msg_text, 'utf-8') for encoding
- Add comprehensive CRLF injection tests for all header fields
- Add test demonstrating SMTPUTF8 policy advantages over SMTP
- Verify Date header uses GMT (+0000) timezone
Addresses security vulnerability per ASVS 1.2.1 and maintainer feedback.
---
atr/mail.py | 57 ++++---
tests/unit/test_mail.py | 385 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 418 insertions(+), 24 deletions(-)
diff --git a/atr/mail.py b/atr/mail.py
index f0cfa83..f15e3c6 100644
--- a/atr/mail.py
+++ b/atr/mail.py
@@ -16,6 +16,9 @@
# under the License.
import dataclasses
+import email.headerregistry as headerregistry
+import email.message as message
+import email.policy as policy
import email.utils as utils
import ssl
import time
@@ -47,40 +50,46 @@ class Message:
in_reply_to: str | None = None
-async def send(message: Message) -> tuple[str, list[str]]:
+async def send(msg_data: Message) -> tuple[str, list[str]]:
"""Send an email notification about an artifact or a vote."""
- log.info(f"Sending email for event: {message}")
- from_addr = message.email_sender
+ log.info(f"Sending email for event: {msg_data}")
+ from_addr = msg_data.email_sender
if not from_addr.endswith(f"@{global_domain}"):
raise ValueError(f"from_addr must end with @{global_domain}, got
{from_addr}")
- to_addr = message.email_recipient
+ to_addr = msg_data.email_recipient
_validate_recipient(to_addr)
# UUID4 is entirely random, with no timestamp nor namespace
# It does have 6 version and variant bits, so only 122 bits are random
mid = f"{uuid.uuid4()}@{global_domain}"
- headers = [
- f"From: {from_addr}",
- f"To: {to_addr}",
- f"Subject: {message.subject}",
- f"Date: {utils.formatdate(localtime=True)}",
- f"Message-ID: <{mid}>",
- ]
- if message.in_reply_to is not None:
- headers.append(f"In-Reply-To: <{message.in_reply_to}>")
- # TODO: Add message.references if necessary
- headers.append(f"References: <{message.in_reply_to}>")
-
- # Normalise the body padding and ensure that line endings are CRLF
- body = message.body.strip()
- body = body.replace("\r\n", "\n")
- body = body.replace("\n", "\r\n")
- body = body + "\r\n"
-
- # Construct the message
- msg_text = "\r\n".join(headers) + "\r\n\r\n" + body
+
+ # Use EmailMessage with Address objects for CRLF injection protection
+ msg = message.EmailMessage(policy=policy.SMTPUTF8)
+
+ try:
+ from_local, from_domain = _split_address(from_addr)
+ to_local, to_domain = _split_address(to_addr)
+
+ msg["From"] = headerregistry.Address(username=from_local,
domain=from_domain)
+ msg["To"] = headerregistry.Address(username=to_local, domain=to_domain)
+ msg["Subject"] = msg_data.subject
+ msg["Date"] = utils.formatdate(usegmt=True)
+ msg["Message-ID"] = f"<{mid}>"
+
+ if msg_data.in_reply_to is not None:
+ msg["In-Reply-To"] = f"<{msg_data.in_reply_to}>"
+ # TODO: Add message.references if necessary
+ msg["References"] = f"<{msg_data.in_reply_to}>"
+ except ValueError as e:
+ log.error(f"SECURITY: CRLF injection attempt detected in email
headers: {e}")
+ return mid, [f"CRLF injection detected: {e}"]
+
+ # Set the email body (handles RFC-compliant line endings automatically)
+ msg.set_content(msg_data.body.strip())
start = time.perf_counter()
+ # Convert to string to satisfy the existing _send_many function signature
+ msg_text = msg.as_string()
log.info(f"sending message: {msg_text}")
errors = await _send_many(from_addr, [to_addr], msg_text)
diff --git a/tests/unit/test_mail.py b/tests/unit/test_mail.py
new file mode 100644
index 0000000..2f5f5e6
--- /dev/null
+++ b/tests/unit/test_mail.py
@@ -0,0 +1,385 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for CRLF injection protection in atr.mail module."""
+
+import email.message as emailmessage
+import email.policy as policy
+from typing import TYPE_CHECKING
+from unittest.mock import AsyncMock
+
+import pytest
+
+import atr.mail as mail
+
+if TYPE_CHECKING:
+ from pytest import MonkeyPatch
+
+
[email protected]
+async def test_send_rejects_crlf_in_subject(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that CRLF injection in subject field is rejected."""
+ # Mock _send_many to ensure we never actually send emails
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with CRLF in the subject
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Legitimate Subject\r\nBcc: [email protected]",
+ body="This is a test message",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called (email was not sent)
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_crlf_in_from_address(monkeypatch: "MonkeyPatch")
-> None:
+ """Test that CRLF injection in from address field is rejected.
+
+ Note: The from_addr validation happens before EmailMessage processing,
+ so this test verifies the early validation layer also protects against
injection.
+ """
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with CRLF in the from address
+ malicious_message = mail.Message(
+ email_sender="[email protected]\r\nBcc: [email protected]",
+ email_recipient="[email protected]",
+ subject="Test Subject",
+ body="This is a test message",
+ )
+
+ # Call send and expect it to raise ValueError due to invalid from_addr
format
+ with pytest.raises(ValueError, match=r"from_addr must end with
@apache.org"):
+ await mail.send(malicious_message)
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_crlf_in_to_address(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that CRLF injection in to address field is rejected.
+
+ Note: The _validate_recipient check happens before EmailMessage processing,
+ so this test verifies the early validation layer also protects against
injection.
+ """
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with CRLF in the to address
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]\r\nBcc: [email protected]",
+ subject="Test Subject",
+ body="This is a test message",
+ )
+
+ # Call send and expect it to raise ValueError due to invalid recipient
format
+ with pytest.raises(ValueError, match=r"Email recipient must be
@apache.org"):
+ await mail.send(malicious_message)
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_crlf_in_reply_to(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that CRLF injection in in_reply_to field is rejected."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with CRLF in the in_reply_to field
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Test Subject",
+ body="This is a test message",
+ in_reply_to="[email protected]\r\nBcc: [email protected]",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_accepts_legitimate_message(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that a legitimate message without CRLF is accepted."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a legitimate message without any CRLF injection attempts
+ legitimate_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Legitimate Subject",
+ body="This is a legitimate test message with no injection attempts.",
+ )
+
+ # Call send
+ mid, errors = await mail.send(legitimate_message)
+
+ # Assert that no errors were returned
+ assert len(errors) == 0
+
+ # Assert that _send_many was called (email was sent)
+ mock_send_many.assert_called_once()
+
+ # Verify the Date header is in GMT
+ call_args = mock_send_many.call_args
+ msg_text = call_args[0][2]
+ date_line = next((line for line in msg_text.splitlines() if
line.startswith("Date: ")), "")
+ assert date_line.endswith("+0000") or date_line.endswith("GMT")
+
+ # Verify the Message-ID was generated
+ assert "@apache.org" in mid
+
+
[email protected]
+async def test_send_accepts_message_with_reply_to(monkeypatch: "MonkeyPatch")
-> None:
+ """Test that a legitimate message with in_reply_to is accepted."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a legitimate message with a valid in_reply_to
+ legitimate_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Re: Previous Subject",
+ body="This is a reply message.",
+ in_reply_to="[email protected]",
+ )
+
+ # Call send
+ mid, errors = await mail.send(legitimate_message)
+
+ # Assert that no errors were returned
+ assert len(errors) == 0
+
+ # Assert that _send_many was called (email was sent)
+ mock_send_many.assert_called_once()
+
+ # Verify the Message-ID was generated
+ assert "@apache.org" in mid
+
+
[email protected]
+async def test_send_rejects_lf_only_injection(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that injection with LF only (\\n) is also rejected."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with just LF (no CR)
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Legitimate Subject\nBcc: [email protected]",
+ body="This is a test message",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_cr_only_injection(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that injection with CR only (\\r) is also rejected."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with just CR (no LF)
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Legitimate Subject\rBcc: [email protected]",
+ body="This is a test message",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch")
-> None:
+ """Test a realistic Bcc header injection attack scenario."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message attempting to inject a Bcc header
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Important Notice\r\nBcc:
[email protected]\r\nX-Priority: 1",
+ body="This message attempts to secretly copy an attacker.",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch")
-> None:
+ """Test injection attempting to override Content-Type header."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message attempting to inject Content-Type
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Test\r\nContent-Type:
text/html\r\n\r\n<html><script>alert('XSS')</script></html>",
+ body="Normal body",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_address_objects_used_for_from_to_headers(monkeypatch:
"MonkeyPatch") -> None:
+ """Test that Address objects are used for From/To headers."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ legitimate_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Test Subject",
+ body="Test body",
+ )
+
+ _, errors = await mail.send(legitimate_message)
+
+ # Verify the message was sent successfully
+ assert len(errors) == 0
+ mock_send_many.assert_called_once()
+
+ # Verify the generated email bytes contain properly formatted addresses
+ call_args = mock_send_many.call_args
+ msg_text = call_args[0][2] # already a str
+
+ # Address objects format email addresses properly
+ assert "From: [email protected]" in msg_text
+ assert "To: [email protected]" in msg_text
+
+
[email protected]
+async def test_send_handles_non_ascii_headers(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that non-ASCII characters in headers are handled correctly."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a message with non-ASCII characters in the subject
+ message_with_unicode = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Test avec Accént",
+ body="This message has non-ASCII characters in the subject.",
+ )
+
+ # Call send
+ _mid, errors = await mail.send(message_with_unicode)
+
+ # Assert that no errors were returned
+ assert len(errors) == 0
+
+ # Assert that _send_many was called with a string (not bytes)
+ mock_send_many.assert_called_once()
+ call_args = mock_send_many.call_args
+ msg_text = call_args[0][2] # Third argument should be str
+ assert isinstance(msg_text, str)
+
+ # Verify the subject is present in the message
+ assert "Subject: Test avec Accént" in msg_text
+
+
+def test_smtp_policy_vs_smtputf8() -> None:
+ """Test that SMTPUTF8 policy is required for proper Unicode handling.
+
+ This demonstrates why we use policy.SMTPUTF8 instead of policy.SMTP.
+ SMTP policy encodes non-ASCII characters (like é) using RFC2047 encoding,
+ while SMTPUTF8 preserves them directly, which is required for modern SMTP.
+ """
+ # SMTP policy - would encode non-ASCII with RFC2047 (=?utf-8?q?...?=)
+ msg_smtp = emailmessage.EmailMessage(policy=policy.SMTP)
+ msg_smtp["From"] = "[email protected]"
+ msg_smtp["To"] = "[email protected]"
+ msg_smtp["Subject"] = "Test avec é"
+ msg_smtp.set_content("Body")
+
+ smtp_str = msg_smtp.as_string()
+ # SMTP policy encodes non-ASCII, making subjects harder to read
+ assert "=?utf-8?" in smtp_str
+ assert "Test avec é" not in smtp_str
+
+ # SMTPUTF8 policy - preserves Unicode directly (required for our use case)
+ msg_smtputf8 = emailmessage.EmailMessage(policy=policy.SMTPUTF8)
+ msg_smtputf8["From"] = "[email protected]"
+ msg_smtputf8["To"] = "[email protected]"
+ msg_smtputf8["Subject"] = "Test avec é"
+ msg_smtputf8.set_content("Body")
+
+ smtputf8_str = msg_smtputf8.as_string()
+ # SMTPUTF8 preserves the character directly
+ assert "Test avec é" in smtputf8_str
+ assert "=?utf-8?" not in smtputf8_str
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]