Log message for revision 112893: Merge r107029 from the tseaver-fix_wsgi branch. - Start test coverage for WSGIPublisher.
Changed: U Zope/trunk/src/ZPublisher/WSGIPublisher.py A Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py -=- Modified: Zope/trunk/src/ZPublisher/WSGIPublisher.py =================================================================== --- Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:08:52 UTC (rev 112892) +++ Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:22:15 UTC (rev 112893) @@ -13,7 +13,6 @@ """ Python Object Publisher -- Publish Python objects on web servers """ from cStringIO import StringIO -import re import sys import time @@ -25,6 +24,12 @@ from ZPublisher.maybe_lock import allocate_lock from ZPublisher.mapply import mapply +_NOW = None # overwrite for testing +def _now(): + if _NOW is not None: + return _NOW + return time.time() + class WSGIResponse(HTTPResponse): """A response object for WSGI @@ -34,57 +39,53 @@ Most significantly, streaming is not (yet) supported. """ _streaming = 0 + _http_version = None + _server_version = None + _http_connection = None - def __str__(self, - html_search=re.compile('<html>',re.I).search, - ): + def __str__(self): + if self._wrote: if self._chunking: return '0\r\n\r\n' else: return '' - headers=self.headers - body=self.body + headers = self.headers + body = self.body # set 204 (no content) status if 200 and response is empty # and not streaming - if not headers.has_key('content-type') and \ - not headers.has_key('content-length') and \ - not self._streaming and \ - self.status == 200: + if ('content-type' not in headers and + 'content-length' not in headers and + not self._streaming and self.status == 200): self.setStatus('nocontent') # add content length if not streaming - if not headers.has_key('content-length') and \ - not self._streaming: - self.setHeader('content-length',len(body)) + content_length = headers.get('content-length') + if content_length is None and not self._streaming: + self.setHeader('content-length', len(body)) - content_length= headers.get('content-length', None) - if content_length>0 : - self.setHeader('content-length', content_length) + chunks = [] + append = chunks.append - headersl=[] - append=headersl.append - - status=headers.get('status', '200 OK') - # status header must come first. - append("HTTP/%s %s" % (self._http_version or '1.0' , status)) - if headers.has_key('status'): - del headers['status'] + version = self._http_version or '1.0' + append("HTTP/%s %d %s" % (version, self.status, self.errmsg)) # add zserver headers - append('Server: %s' % self._server_version) - append('Date: %s' % build_http_date(time.time())) + if self._server_version is not None: + append('Server: %s' % self._server_version) - if self._http_version=='1.0': - if self._http_connection=='keep-alive' and \ - self.headers.has_key('content-length'): - self.setHeader('Connection','Keep-Alive') + append('Date: %s' % build_http_date(_now())) + + if self._http_version == '1.0': + if (self._http_connection == 'keep-alive' and + 'content-length' in self.headers): + self.setHeader('Connection', 'Keep-Alive') else: - self.setHeader('Connection','close') + self.setHeader('Connection', 'close') # Close the connection if we have been asked to. # Use chunking if streaming output. @@ -109,11 +110,18 @@ start=l+1 l=key.find('-',start) append("%s: %s" % (key, val)) + if self.cookies: - headersl=headersl+self._cookie_list() - headersl[len(headersl):]=[self.accumulated_headers, body] - return "\r\n".join(headersl) + chunks.extend(self._cookie_list()) + for key, value in self.accumulated_headers: + append("%s: %s" % (key, value)) + + append('') # RFC 2616 mandates empty line between headers and payload + append(body) + + return "\r\n".join(chunks) + class Retry(Exception): """Raise this to retry a request Added: Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py =================================================================== --- Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py (rev 0) +++ Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py 2010-06-01 21:22:15 UTC (rev 112893) @@ -0,0 +1,135 @@ +############################################################################## +# +# Copyright (c) 2009 Zope Foundation and Contributors. All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## +import unittest + +class WSGIResponseTests(unittest.TestCase): + + _old_NOW = None + + def tearDown(self): + if self._old_NOW is not None: + self._setNOW(self._old_NOW) + + def _getTargetClass(self): + from ZPublisher.WSGIPublisher import WSGIResponse + return WSGIResponse + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def _setNOW(self, value): + from ZPublisher import WSGIPublisher + WSGIPublisher._NOW, self._old_NOW = value, WSGIPublisher._NOW + + def test___str__already_wrote_not_chunking(self): + response = self._makeOne() + response._wrote = True + response._chunking = False + self.assertEqual(str(response), '') + + def test___str__already_wrote_w_chunking(self): + response = self._makeOne() + response._wrote = True + response._chunking = True + self.assertEqual(str(response), '0\r\n\r\n') + + def test___str__sets_204_on_empty_not_streaming(self): + response = self._makeOne() + str(response) # not checking value + self.assertEqual(response.status, 204) + + def test___str__sets_204_on_empty_not_streaming_ignores_non_200(self): + response = self._makeOne() + response.setStatus(302) + str(response) # not checking value + self.assertEqual(response.status, 302) + + def test___str___sets_content_length_if_missing(self): + response = self._makeOne() + response.setBody('TESTING') + str(response) # not checking value + self.assertEqual(response.getHeader('Content-Length'), + str(len('TESTING'))) + + def test___str___skips_setting_content_length_if_missing_w_streaming(self): + response = self._makeOne() + response._streaming = True + response.body = 'TESTING' + str(response) # not checking value + self.failIf(response.getHeader('Content-Length')) + + def test___str___w_default_http_version(self): + response = self._makeOne() + response.setBody('TESTING') + result = str(response).splitlines() + self.assertEqual(result[0], 'HTTP/1.0 200 OK') + + def test___str___w_explicit_http_version(self): + response = self._makeOne() + response.setBody('TESTING') + response._http_version = '1.1' + result = str(response).splitlines() + self.assertEqual(result[0], 'HTTP/1.1 200 OK') + + def test___str___skips_Server_header_wo_server_version_set(self): + response = self._makeOne() + response.setBody('TESTING') + result = str(response).splitlines() + sv = [x for x in result if x.lower().startswith('server-version')] + self.failIf(sv) + + def test___str___includes_Server_header_w_server_version_set(self): + response = self._makeOne() + response._server_version = 'TESTME' + response.setBody('TESTING') + result = str(response).splitlines() + self.assertEqual(result[1], 'Server: TESTME') + + def test___str___includes_Date_header(self): + import time + WHEN = time.localtime() + self._setNOW(time.mktime(WHEN)) + response = self._makeOne() + response.setBody('TESTING') + result = str(response).splitlines() + self.assertEqual(result[1], 'Date: %s' % + time.strftime('%a, %d %b %Y %H:%M:%S GMT', + time.gmtime(time.mktime(WHEN)))) + + def test___str___HTTP_1_0_keep_alive_w_content_length(self): + response = self._makeOne() + response._http_version = '1.0' + response._http_connection = 'keep-alive' + response.setBody('TESTING') + str(response) # not checking value + self.assertEqual(response.getHeader('Connection'), 'Keep-Alive') + + def test___str___HTTP_1_0_keep_alive_wo_content_length_streaming(self): + response = self._makeOne() + response._http_version = '1.0' + response._http_connection = 'keep-alive' + response._streaming = True + str(response) # not checking value + self.assertEqual(response.getHeader('Connection'), 'close') + + def test___str___HTTP_1_0_not_keep_alive_w_content_length(self): + response = self._makeOne() + response._http_version = '1.0' + response.setBody('TESTING') + str(response) # not checking value + self.assertEqual(response.getHeader('Connection'), 'close') + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(WSGIResponseTests)) + return suite _______________________________________________ Zope-Checkins maillist - Zope-Checkins@zope.org https://mail.zope.org/mailman/listinfo/zope-checkins