On 13/02/14 15:42, Flavio Castelli wrote:
I just realized I forgot to push this small patch upstream.
Here we go again, I forgot another patch :) Flavio
>From 64c7ed901c1faeb2167251d1e7ca8fe1c9e2c90a Mon Sep 17 00:00:00 2001 From: Flavio Castelli <fcaste...@suse.com> Date: Wed, 15 Jan 2014 11:53:35 +0100 Subject: [PATCH 1/2] python tests: fixed rhnsql-tests Updated the inline documentation of the `convert_named_query_params` to reflect the changes made to its API. Updated the rhnsql-tests to the new API. --- backend/server/rhnSQL/driver_postgresql.py | 6 +---- .../server/test/unit-test/rhnSQL/rhnsql-tests.py | 29 ++++++---------------- 2 files changed, 9 insertions(+), 26 deletions(-) diff --git a/backend/server/rhnSQL/driver_postgresql.py b/backend/server/rhnSQL/driver_postgresql.py index 0ca640a..a814556 100644 --- a/backend/server/rhnSQL/driver_postgresql.py +++ b/backend/server/rhnSQL/driver_postgresql.py @@ -46,11 +46,7 @@ def convert_named_query_params(query): existing queries intact we'll convert them when provided to the postgresql driver. - RETURNS: tuple with: - - the new query with parameters replaced - - hash of each named parameter to an ordered list of the positions - where it was used. - - number of arguments found and replaced + RETURNS: the new query with parameters replaced """ log_debug(6, "Converting query for PostgreSQL: %s" % query) new_query = re.sub(r'(\W):(\w+)', r'\1%(\2)s', query.replace('%', '%%')) diff --git a/backend/server/test/unit-test/rhnSQL/rhnsql-tests.py b/backend/server/test/unit-test/rhnSQL/rhnsql-tests.py index c2855cb..39aeeb0 100644 --- a/backend/server/test/unit-test/rhnSQL/rhnsql-tests.py +++ b/backend/server/test/unit-test/rhnSQL/rhnsql-tests.py @@ -29,41 +29,28 @@ class RhnSQLTests(unittest.TestCase): def test_convert_named_query_params(self): query = "INSERT INTO people(id, name, phone) VALUES(:id, :name, :phone)" expected_query = \ - "INSERT INTO people(id, name, phone) VALUES($1, $2, $3)" + "INSERT INTO people(id, name, phone) VALUES(%(id)s, %(name)s, %(phone)s)" - (new_query, param_index, args_found) = convert_named_query_params(query) + new_query = convert_named_query_params(query) self.assertEquals(expected_query, new_query) - self.assertEquals(3, len(param_index.keys())) - self.assertEquals(3, args_found) - self.assertEquals([1], param_index['id']) - self.assertEquals([2], param_index['name']) - self.assertEquals([3], param_index['phone']) def test_convert_named_params_none_required(self): query = "SELECT * FROM people" - (new_query, param_index, args_found) = convert_named_query_params(query) + new_query = convert_named_query_params(query) self.assertEquals(query, new_query) - self.assertEquals(0, len(param_index.keys())) def test_convert_named_params_multiple_uses(self): query = "INSERT INTO people(a, b, c, d) VALUES(:a, :b, :a, :b)" expected_query = \ - "INSERT INTO people(a, b, c, d) VALUES($1, $2, $3, $4)" + "INSERT INTO people(a, b, c, d) VALUES(%(a)s, %(b)s, %(a)s, %(b)s)" - (new_query, param_index, args_found) = convert_named_query_params(query) + new_query = convert_named_query_params(query) self.assertEquals(expected_query, new_query) - self.assertEquals(4, args_found) - self.assertEquals(2, len(param_index.keys())) - self.assertEquals([1, 3], param_index['a']) - self.assertEquals([2, 4], param_index['b']) def test_date_format_conversion_issue(self): query = "SELECT TO_CHAR(issued, 'YYYY-MM-DD HH24:MI:SS') issued FROM rhnSatelliteCert WHERE id=:id, name=:name" - expected_query = "SELECT TO_CHAR(issued, 'YYYY-MM-DD HH24:MI:SS') issued FROM rhnSatelliteCert WHERE id=$1, name=$2" - (new_query, param_index, args_found) = convert_named_query_params(query) + expected_query = "SELECT TO_CHAR(issued, 'YYYY-MM-DD HH24:MI:SS') issued FROM rhnSatelliteCert WHERE id=%(id)s, name=%(name)s" + new_query = convert_named_query_params(query) self.assertEquals(expected_query, new_query) - self.assertEquals(2, args_found) - self.assertEquals(2, len(param_index.keys())) - self.assertEquals([1], param_index['id']) - self.assertEquals([2], param_index['name']) + -- 1.8.4.5
>From dbccd213bca83157eb10fb1a633465dc67a589a6 Mon Sep 17 00:00:00 2001 From: Flavio Castelli <fcaste...@suse.com> Date: Fri, 18 Oct 2013 18:01:09 +0200 Subject: [PATCH 2/2] python: consolidate backen/server tests Moved the only unit test defined under `backend/server/test/unittest` into the directory containing all the other test files. --- backend/server/test/unit-test/test_auditlog.py | 203 +++++++++++++++++++++++++ backend/server/test/unittest/test_auditlog.py | 203 ------------------------- 2 files changed, 203 insertions(+), 203 deletions(-) create mode 100644 backend/server/test/unit-test/test_auditlog.py delete mode 100644 backend/server/test/unittest/test_auditlog.py diff --git a/backend/server/test/unit-test/test_auditlog.py b/backend/server/test/unit-test/test_auditlog.py new file mode 100644 index 0000000..47cca58 --- /dev/null +++ b/backend/server/test/unit-test/test_auditlog.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2011 SUSE LINUX Products GmbH, Nuernberg, Germany. +# +# This software is licensed to you under the GNU General Public License, +# version 2 (GPLv2). There is NO WARRANTY for this software, express or +# implied, including the implied warranties of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 +# along with this software; if not, see +# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. +# +# Red Hat trademarks are not licensed under GPLv2. No permission is +# granted to use or replicate Red Hat trademarks that are incorporated +# in this software or its documentation + +import unittest +from StringIO import StringIO +from collections import defaultdict +from xmlrpclib import Error + +from mock import Mock, patch + +from spacewalk.server.apacheRequest import apacheRequest + +import spacewalk.server.auditlog as auditlog +from spacewalk.server.auditlog import auditlog_xmlrpc, AuditLogException + + +class AuditLogTest(unittest.TestCase): + @classmethod + def setUpAll(self): + auditlog._get_uid = Mock(return_value="42(geeko)") + + def setUp(self): + # mock the ServerProxy.log method so we can see how it is called + self._real_server_proxy = auditlog.ServerProxy + + self.auditlog_server = Mock() + self.auditlog_server.log = Mock() + auditlog.ServerProxy = Mock(return_value=self.auditlog_server) + + # mock the _get_config method + auditlog._read_config = Mock(return_value=(True, "bogus_url")) + + def tearDown(self): + # clean up after _mock_xmlrpc_server + auditlog.ServerProxy = self._real_server_proxy + + def test_logging_is_disabled(self): + auditlog._read_config = Mock(return_value=(False, "")) + + auditlog_xmlrpc("method", "method_name", "args", "request") + self.assertFalse(self.auditlog_server.log.called) + + def test_wrong_server_url(self): + # revert mocking of the ServerProxy.log because we want to see + # the error it raises + auditlog.ServerProxy = self._real_server_proxy + + myerr = StringIO() + with patch("sys.stderr", myerr): + self.assertRaises(AuditLogException, auditlog_xmlrpc, + "method", "method_name", "args", "request") + myerr.seek(0) + err = myerr.read() + wanted_err = ("Could not establish a connection to the AuditLog " + "server. IOError: unsupported XML-RPC protocol. " + "Is this server url correct? bogus_url") + assert (wanted_err in err), ( + "Error string %s\n was not found in stderr: %s" % (wanted_err, err)) + + def test_dont_log_methods_without_system_id(self): + # we need a method that doesn't have a system_id parameter + def api_method(not_system_id): pass + + auditlog_xmlrpc(api_method, "method_name", "args", "request") + self.assertFalse(self.auditlog_server.log.called) + + def test_get_server_id(self): + def api_method(self, system_id, arg1, arg2): pass + args = ('system_id_xml', 'arg1', 'arg2') + + # mock rhnServer.get rhnServer.get(sysid_xml).getid() + rhnserver_got = Mock() + rhnserver_got.getid = Mock(return_value="10001000") + auditlog.rhnServer.get = Mock(return_value=rhnserver_got) + + self.assertEqual(auditlog._get_server_id(api_method, args), + ('10001000', ('10001000', 'arg1', 'arg2'))) + self.assertEqual(auditlog.rhnServer.get.call_args, + (('system_id_xml',), {}), ) + + def test_successful_logging_without_proxy(self): + request = Mock() + request.headers_in = {"SERVER_NAME": "server_name", + "REMOTE_ADDR": "remote_addr", + "SERVER_PORT": "server_port", + "DOCUMENT_ROOT": "document_root", + "SCRIPT_FILENAME": "script_filename", + "SCRIPT_URI": "script_uri"} + auditlog._get_server_id = Mock(return_value= + ("system_id", ("arg1", "arg2"))) + def api_method(system_id): + pass + + auditlog_xmlrpc(api_method, "api_method_name", ["args"], request) + + self.assertEqual(self.auditlog_server.audit.log.call_args, + (('42(geeko)', "api_method_name('arg1', 'arg2')", + 'server_name', + {'EVT.SRC': 'BACKEND_API', + 'REQ.SCRIPT_URI': 'script_uri', + 'REQ.SCRIPT_FILENAME': 'script_filename', + 'REQ.REMOTE_ADDR': 'remote_addr', + 'REQ.DOCUMENT_ROOT': 'document_root', + 'REQ.SERVER_PORT': 'server_port'}), {})) + + def test_successful_logging_with_proxy(self): + request = Mock() + request.headers_in = {"SERVER_NAME": "server_name", + "REMOTE_ADDR": "remote_addr", + "SERVER_PORT": "server_port", + "DOCUMENT_ROOT": "document_root", + "SCRIPT_FILENAME": "script_filename", + "SCRIPT_URI": "script_uri", + "HTTP_X_RHN_PROXY_AUTH": "proxy_auth", + "HTTP_X_RHN_PROXY_VERSION": "proxy_version", + "HTTP_X_RHN_IP_PATH": "original_addr"} + auditlog._get_server_id = Mock(return_value= + ("system_id", ("arg1", "arg2"))) + def api_method(system_id): pass + + auditlog_xmlrpc(api_method, "api_method_name", ["args"], request) + + self.assertEqual(self.auditlog_server.audit.log.call_args, + (('42(geeko)', "api_method_name('arg1', 'arg2')", + 'server_name', + {'EVT.SRC': 'BACKEND_API', + 'REQ.SCRIPT_URI': 'script_uri', + 'REQ.SCRIPT_FILENAME': 'script_filename', + 'REQ.REMOTE_ADDR': 'remote_addr', + 'REQ.DOCUMENT_ROOT': 'document_root', + 'REQ.SERVER_PORT': 'server_port', + 'REQ.PROXY': 'proxy_auth', + 'REQ.PROXY_VERSION': 'proxy_version', + 'REQ.ORIGINAL_ADDR': 'original_addr'}), {})) + + def test_remote_auditlog_error(self): + request = Mock() + request.headers_in = defaultdict(dict) + auditlog._get_server_id = Mock(return_value= + ("system_id", ("arg1", "arg2"))) + def api_method(system_id): pass + + self.auditlog_server.audit.log = Mock(side_effect=Error) + + myerr = StringIO() + with patch("sys.stderr", myerr): + self.assertRaises(AuditLogException, auditlog_xmlrpc, api_method, + "api_method_name", ["args"], request) + myerr.seek(0) + err = myerr.read() + wanted_err = ("Got an error while talking to the AuditLogging server " + "at bogus_url. Error was: Error()") + assert(wanted_err in err), ( + "Error string %s\n was not found in stderr: %s" % (wanted_err, err)) + + def test_missing_header_values_sent_as_null_strings(self): + request = Mock() + request.headers_in = {"SERVER_NAME": "server_name"} + auditlog._get_server_id = Mock(return_value=("system_id", ("args",))) + def api_method(system_id): pass + + auditlog_xmlrpc(api_method, "method_name", ["args"], request) + self.assertEqual(self.auditlog_server.audit.log.call_args, + (('42(geeko)', "method_name('args',)", 'server_name', + {'EVT.SRC': 'BACKEND_API', + 'REQ.SCRIPT_URI': '', + 'REQ.SCRIPT_FILENAME': '', + 'REQ.REMOTE_ADDR': '', + 'REQ.DOCUMENT_ROOT': '', + 'REQ.SERVER_PORT': ''}), {})) + + def test_missing_header_values_in_proxy_sent_as_null_strings(self): + request = Mock() + request.headers_in = {"SERVER_NAME": "server_name", + "HTTP_X_RHN_PROXY_AUTH": "proxy"} + auditlog._get_server_id = Mock(return_value=("system_id", ("args",))) + def api_method(system_id): pass + + auditlog_xmlrpc(api_method, "method_name", ["args"], request) + self.assertEqual(self.auditlog_server.audit.log.call_args, + (('42(geeko)', "method_name('args',)", 'server_name', + {'EVT.SRC': 'BACKEND_API', + 'REQ.SCRIPT_URI': '', + 'REQ.SCRIPT_FILENAME': '', + 'REQ.REMOTE_ADDR': '', + 'REQ.DOCUMENT_ROOT': '', + 'REQ.SERVER_PORT': '', + 'REQ.PROXY': 'proxy', + 'REQ.PROXY_VERSION': '', + 'REQ.ORIGINAL_ADDR': ''}), {})) diff --git a/backend/server/test/unittest/test_auditlog.py b/backend/server/test/unittest/test_auditlog.py deleted file mode 100644 index 47cca58..0000000 --- a/backend/server/test/unittest/test_auditlog.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (c) 2011 SUSE LINUX Products GmbH, Nuernberg, Germany. -# -# This software is licensed to you under the GNU General Public License, -# version 2 (GPLv2). There is NO WARRANTY for this software, express or -# implied, including the implied warranties of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 -# along with this software; if not, see -# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. -# -# Red Hat trademarks are not licensed under GPLv2. No permission is -# granted to use or replicate Red Hat trademarks that are incorporated -# in this software or its documentation - -import unittest -from StringIO import StringIO -from collections import defaultdict -from xmlrpclib import Error - -from mock import Mock, patch - -from spacewalk.server.apacheRequest import apacheRequest - -import spacewalk.server.auditlog as auditlog -from spacewalk.server.auditlog import auditlog_xmlrpc, AuditLogException - - -class AuditLogTest(unittest.TestCase): - @classmethod - def setUpAll(self): - auditlog._get_uid = Mock(return_value="42(geeko)") - - def setUp(self): - # mock the ServerProxy.log method so we can see how it is called - self._real_server_proxy = auditlog.ServerProxy - - self.auditlog_server = Mock() - self.auditlog_server.log = Mock() - auditlog.ServerProxy = Mock(return_value=self.auditlog_server) - - # mock the _get_config method - auditlog._read_config = Mock(return_value=(True, "bogus_url")) - - def tearDown(self): - # clean up after _mock_xmlrpc_server - auditlog.ServerProxy = self._real_server_proxy - - def test_logging_is_disabled(self): - auditlog._read_config = Mock(return_value=(False, "")) - - auditlog_xmlrpc("method", "method_name", "args", "request") - self.assertFalse(self.auditlog_server.log.called) - - def test_wrong_server_url(self): - # revert mocking of the ServerProxy.log because we want to see - # the error it raises - auditlog.ServerProxy = self._real_server_proxy - - myerr = StringIO() - with patch("sys.stderr", myerr): - self.assertRaises(AuditLogException, auditlog_xmlrpc, - "method", "method_name", "args", "request") - myerr.seek(0) - err = myerr.read() - wanted_err = ("Could not establish a connection to the AuditLog " - "server. IOError: unsupported XML-RPC protocol. " - "Is this server url correct? bogus_url") - assert (wanted_err in err), ( - "Error string %s\n was not found in stderr: %s" % (wanted_err, err)) - - def test_dont_log_methods_without_system_id(self): - # we need a method that doesn't have a system_id parameter - def api_method(not_system_id): pass - - auditlog_xmlrpc(api_method, "method_name", "args", "request") - self.assertFalse(self.auditlog_server.log.called) - - def test_get_server_id(self): - def api_method(self, system_id, arg1, arg2): pass - args = ('system_id_xml', 'arg1', 'arg2') - - # mock rhnServer.get rhnServer.get(sysid_xml).getid() - rhnserver_got = Mock() - rhnserver_got.getid = Mock(return_value="10001000") - auditlog.rhnServer.get = Mock(return_value=rhnserver_got) - - self.assertEqual(auditlog._get_server_id(api_method, args), - ('10001000', ('10001000', 'arg1', 'arg2'))) - self.assertEqual(auditlog.rhnServer.get.call_args, - (('system_id_xml',), {}), ) - - def test_successful_logging_without_proxy(self): - request = Mock() - request.headers_in = {"SERVER_NAME": "server_name", - "REMOTE_ADDR": "remote_addr", - "SERVER_PORT": "server_port", - "DOCUMENT_ROOT": "document_root", - "SCRIPT_FILENAME": "script_filename", - "SCRIPT_URI": "script_uri"} - auditlog._get_server_id = Mock(return_value= - ("system_id", ("arg1", "arg2"))) - def api_method(system_id): - pass - - auditlog_xmlrpc(api_method, "api_method_name", ["args"], request) - - self.assertEqual(self.auditlog_server.audit.log.call_args, - (('42(geeko)', "api_method_name('arg1', 'arg2')", - 'server_name', - {'EVT.SRC': 'BACKEND_API', - 'REQ.SCRIPT_URI': 'script_uri', - 'REQ.SCRIPT_FILENAME': 'script_filename', - 'REQ.REMOTE_ADDR': 'remote_addr', - 'REQ.DOCUMENT_ROOT': 'document_root', - 'REQ.SERVER_PORT': 'server_port'}), {})) - - def test_successful_logging_with_proxy(self): - request = Mock() - request.headers_in = {"SERVER_NAME": "server_name", - "REMOTE_ADDR": "remote_addr", - "SERVER_PORT": "server_port", - "DOCUMENT_ROOT": "document_root", - "SCRIPT_FILENAME": "script_filename", - "SCRIPT_URI": "script_uri", - "HTTP_X_RHN_PROXY_AUTH": "proxy_auth", - "HTTP_X_RHN_PROXY_VERSION": "proxy_version", - "HTTP_X_RHN_IP_PATH": "original_addr"} - auditlog._get_server_id = Mock(return_value= - ("system_id", ("arg1", "arg2"))) - def api_method(system_id): pass - - auditlog_xmlrpc(api_method, "api_method_name", ["args"], request) - - self.assertEqual(self.auditlog_server.audit.log.call_args, - (('42(geeko)', "api_method_name('arg1', 'arg2')", - 'server_name', - {'EVT.SRC': 'BACKEND_API', - 'REQ.SCRIPT_URI': 'script_uri', - 'REQ.SCRIPT_FILENAME': 'script_filename', - 'REQ.REMOTE_ADDR': 'remote_addr', - 'REQ.DOCUMENT_ROOT': 'document_root', - 'REQ.SERVER_PORT': 'server_port', - 'REQ.PROXY': 'proxy_auth', - 'REQ.PROXY_VERSION': 'proxy_version', - 'REQ.ORIGINAL_ADDR': 'original_addr'}), {})) - - def test_remote_auditlog_error(self): - request = Mock() - request.headers_in = defaultdict(dict) - auditlog._get_server_id = Mock(return_value= - ("system_id", ("arg1", "arg2"))) - def api_method(system_id): pass - - self.auditlog_server.audit.log = Mock(side_effect=Error) - - myerr = StringIO() - with patch("sys.stderr", myerr): - self.assertRaises(AuditLogException, auditlog_xmlrpc, api_method, - "api_method_name", ["args"], request) - myerr.seek(0) - err = myerr.read() - wanted_err = ("Got an error while talking to the AuditLogging server " - "at bogus_url. Error was: Error()") - assert(wanted_err in err), ( - "Error string %s\n was not found in stderr: %s" % (wanted_err, err)) - - def test_missing_header_values_sent_as_null_strings(self): - request = Mock() - request.headers_in = {"SERVER_NAME": "server_name"} - auditlog._get_server_id = Mock(return_value=("system_id", ("args",))) - def api_method(system_id): pass - - auditlog_xmlrpc(api_method, "method_name", ["args"], request) - self.assertEqual(self.auditlog_server.audit.log.call_args, - (('42(geeko)', "method_name('args',)", 'server_name', - {'EVT.SRC': 'BACKEND_API', - 'REQ.SCRIPT_URI': '', - 'REQ.SCRIPT_FILENAME': '', - 'REQ.REMOTE_ADDR': '', - 'REQ.DOCUMENT_ROOT': '', - 'REQ.SERVER_PORT': ''}), {})) - - def test_missing_header_values_in_proxy_sent_as_null_strings(self): - request = Mock() - request.headers_in = {"SERVER_NAME": "server_name", - "HTTP_X_RHN_PROXY_AUTH": "proxy"} - auditlog._get_server_id = Mock(return_value=("system_id", ("args",))) - def api_method(system_id): pass - - auditlog_xmlrpc(api_method, "method_name", ["args"], request) - self.assertEqual(self.auditlog_server.audit.log.call_args, - (('42(geeko)', "method_name('args',)", 'server_name', - {'EVT.SRC': 'BACKEND_API', - 'REQ.SCRIPT_URI': '', - 'REQ.SCRIPT_FILENAME': '', - 'REQ.REMOTE_ADDR': '', - 'REQ.DOCUMENT_ROOT': '', - 'REQ.SERVER_PORT': '', - 'REQ.PROXY': 'proxy', - 'REQ.PROXY_VERSION': '', - 'REQ.ORIGINAL_ADDR': ''}), {})) -- 1.8.4.5
_______________________________________________ Spacewalk-devel mailing list Spacewalk-devel@redhat.com https://www.redhat.com/mailman/listinfo/spacewalk-devel