This is an automated email from the ASF dual-hosted git repository.

kentontaylor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/allura.git

commit 156ec6f093c5537a815e004d088013bd68333fbe
Author: Dave Brondsema <dbronds...@slashdotmedia.com>
AuthorDate: Tue May 7 11:56:27 2024 -0400

    prevent dns rebinding
---
 Allura/allura/lib/helpers.py                     | 37 ++++++++++++++++++------
 Allura/allura/lib/validators.py                  |  3 +-
 Allura/allura/tests/test_helpers.py              | 29 ++++++++++++++++++-
 ForgeImporters/forgeimporters/tests/test_base.py |  2 +-
 4 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/Allura/allura/lib/helpers.py b/Allura/allura/lib/helpers.py
index c2e83eec2..0746639f2 100644
--- a/Allura/allura/lib/helpers.py
+++ b/Allura/allura/lib/helpers.py
@@ -41,6 +41,7 @@ import socket
 from functools import partial
 from io import BytesIO
 import html
+import http.client
 
 import emoji
 import tg
@@ -1046,15 +1047,36 @@ class exceptionless:
         return inner
 
 
-class NoRedirectToInternal(urllib.request.HTTPRedirectHandler):
-    def redirect_request(self, req: urllib.request.Request, fp, code, msg, 
headers, newurl: str):
+class NoInternalHTTPSHandler(urllib.request.HTTPSHandler):
+    def https_open(self, req):
         if not asbool(tg.config.get('urlopen_allow_internal_hostnames', 
'false')):
-            validators.URLIsPrivate().to_python(newurl, None)
-        return super().redirect_request(req, fp, code, msg, headers, newurl)
+            # check on every open, covers redirects
+            validators.URLIsPrivate().to_python(req.full_url, None)
+        return super().https_open(req)
 
+class UseKnownIPHTTPConnection(http.client.HTTPConnection):
+    # only for http, since https requires a valid cert it should be ok
+    def __init__(self, *a, known_ip_to_use=None, **kw):
+        super().__init__(*a, **kw)
+        self.known_ip_to_use = known_ip_to_use
 
-opener = urllib.request.build_opener(NoRedirectToInternal)
-urllib.request.install_opener(opener)
+    def connect(self):
+        self.host = self.known_ip_to_use
+        super().connect()
+
+
+class NotInternalHTTPHandler(urllib.request.HTTPHandler):
+    def http_open(self, req):
+        if not asbool(tg.config.get('urlopen_allow_internal_hostnames', 
'false')):
+            # check on every open, covers redirects
+            ipValidator = validators.URLIsPrivate()
+            ipValidator.to_python(req.full_url, None)
+            # and force the open to use the known IP
+            return self.do_open(UseKnownIPHTTPConnection, req, 
known_ip_to_use=str(ipValidator.ip))
+        else:
+            return super().http_open(req)
+
+urllib.request.install_opener(urllib.request.build_opener(NotInternalHTTPHandler,
 NoInternalHTTPSHandler))
 
 
 def urlopen(url: str | urllib.request.Request, retries=3, codes=(408, 500, 
502, 503, 504), timeout=None):
@@ -1073,9 +1095,6 @@ def urlopen(url: str | urllib.request.Request, retries=3, 
codes=(408, 500, 502,
         url_str = url
     if not url_str.startswith(('http://', 'https://')):
         raise ValueError(f'URL must be http(s), got {url_str}')
-    if not asbool(tg.config.get('urlopen_allow_internal_hostnames', 'false')):
-        # will raise error if hostname resolves to private address space:
-        validators.URLIsPrivate().to_python(url_str, None)
 
     attempts = 0
     while True:
diff --git a/Allura/allura/lib/validators.py b/Allura/allura/lib/validators.py
index 8d6556c24..8b30d2989 100644
--- a/Allura/allura/lib/validators.py
+++ b/Allura/allura/lib/validators.py
@@ -54,12 +54,13 @@ class URLIsPrivate(URL):
         value = super()._convert_to_python(value, state)
         url_components = urlsplit(value)
         try:
-            host_ip = socket.gethostbyname(url_components.netloc)
+            host_ip = socket.gethostbyname(url_components.hostname)
         except socket.gaierror:
             raise fev.Invalid("Invalid URL.", value, state)
         parse_ip = ip_address(host_ip)
         if parse_ip and parse_ip.is_private:
             raise fev.Invalid("Invalid URL.", value, state)
+        self.ip = parse_ip
         return value
 
 
diff --git a/Allura/allura/tests/test_helpers.py 
b/Allura/allura/tests/test_helpers.py
index 430edd7e9..8bd613ef8 100644
--- a/Allura/allura/tests/test_helpers.py
+++ b/Allura/allura/tests/test_helpers.py
@@ -32,6 +32,7 @@ from webob.exc import HTTPUnauthorized
 from ming.odm import ThreadLocalODMSession
 from markupsafe import Markup
 import pytest
+from formencode import validators as fev
 
 from allura import model as M
 from allura.lib import exceptions as exc
@@ -471,15 +472,38 @@ back\\\-slash escaped
         r'tab before \(stuff\)'
 
 
-class TestUrlOpen(TestCase):
+class TestUrlOpen:
+
+    @pytest.mark.parametrize('url', [
+        'http://127.0.0.1/',
+        'http://::1/',  # ipv6
+        'http://localhost/',
+        'https://localhost/',
+        'http://localhost:8080',
+        'https://httpbin.org/redirect-to?url=http://localhost',
+        'https://httpbin.org/redirect-to?url=https://localhost',
+    ])
+    def test_internal_invalid(self, url):
+        with pytest.raises(fev.Invalid):
+            h.urlopen(url)
+
+    @pytest.mark.parametrize('url', [
+        'http://httpbin.org/status/200',
+        'https://httpbin.org/status/200',
+        'http://httpbin.org:80/status/200',
+    ])
+    def test_ok(self, url):
+        h.urlopen(url)
 
     @patch('urllib.request.urlopen')
+    @patch.dict(config, {'urlopen_allow_internal_hostnames': 'true'})
     def test_no_error(self, urlopen):
         r = h.urlopen('http://example.com')
         assert r == urlopen.return_value
         urlopen.assert_called_once_with('http://example.com', timeout=None)
 
     @patch('urllib.request.urlopen')
+    @patch.dict(config, {'urlopen_allow_internal_hostnames': 'true'})
     def test_socket_timeout(self, urlopen):
         import socket
 
@@ -492,6 +516,7 @@ class TestUrlOpen(TestCase):
         assert urlopen.call_count == 4
 
     @patch('urllib.request.urlopen')
+    @patch.dict(config, {'urlopen_allow_internal_hostnames': 'true'})
     def test_socket_reset(self, urlopen):
         import socket
         import errno
@@ -505,6 +530,7 @@ class TestUrlOpen(TestCase):
         assert urlopen.call_count == 4
 
     @patch('urllib.request.urlopen')
+    @patch.dict(config, {'urlopen_allow_internal_hostnames': 'true'})
     def test_handled_http_error(self, urlopen):
         from urllib.error import HTTPError
 
@@ -517,6 +543,7 @@ class TestUrlOpen(TestCase):
         assert urlopen.call_count == 4
 
     @patch('urllib.request.urlopen')
+    @patch.dict(config, {'urlopen_allow_internal_hostnames': 'true'})
     def test_unhandled_http_error(self, urlopen):
         from urllib.error import HTTPError
 
diff --git a/ForgeImporters/forgeimporters/tests/test_base.py 
b/ForgeImporters/forgeimporters/tests/test_base.py
index ac9e0071d..b009e9f5a 100644
--- a/ForgeImporters/forgeimporters/tests/test_base.py
+++ b/ForgeImporters/forgeimporters/tests/test_base.py
@@ -55,7 +55,7 @@ class TestProjectExtractor(TestCase):
     def test_urlopen_internal_blocked(self):
         # by default this is invalid
         with pytest.raises(Invalid):
-            base.ProjectExtractor.urlopen('http://localhost:1234/blah', 
data='foo')
+            base.ProjectExtractor.urlopen('http://localhost:1234/blah', 
data=b'foo')
 
         # redirect to external site ok
         
base.ProjectExtractor.urlopen('https://httpbin.org/redirect-to?url=http%3A%2F%2Fexample.com%2F')

Reply via email to