http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/cursor.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/cursor.py b/python/phoenixdb/cursor.py new file mode 100644 index 0000000..8be7bed --- /dev/null +++ b/python/phoenixdb/cursor.py @@ -0,0 +1,347 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import collections +from phoenixdb.types import TypeHelper +from phoenixdb.errors import ProgrammingError, InternalError +from phoenixdb.avatica.proto import common_pb2 + +__all__ = ['Cursor', 'ColumnDescription', 'DictCursor'] + +logger = logging.getLogger(__name__) + +# TODO see note in Cursor.rowcount() +MAX_INT = 2 ** 64 - 1 + +ColumnDescription = collections.namedtuple('ColumnDescription', 'name type_code display_size internal_size precision scale null_ok') +"""Named tuple for representing results from :attr:`Cursor.description`.""" + + +class Cursor(object): + """Database cursor for executing queries and iterating over results. + + You should not construct this object manually, use :meth:`Connection.cursor() <phoenixdb.connection.Connection.cursor>` instead. + """ + + arraysize = 1 + """ + Read/write attribute specifying the number of rows to fetch + at a time with :meth:`fetchmany`. It defaults to 1 meaning to + fetch a single row at a time. + """ + + itersize = 2000 + """ + Read/write attribute specifying the number of rows to fetch + from the backend at each network roundtrip during iteration + on the cursor. The default is 2000. + """ + + def __init__(self, connection, id=None): + self._connection = connection + self._id = id + self._signature = None + self._column_data_types = [] + self._frame = None + self._pos = None + self._closed = False + self.arraysize = self.__class__.arraysize + self.itersize = self.__class__.itersize + self._updatecount = -1 + + def __del__(self): + if not self._connection._closed and not self._closed: + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if not self._closed: + self.close() + + def __iter__(self): + return self + + def __next__(self): + row = self.fetchone() + if row is None: + raise StopIteration + return row + + next = __next__ + + def close(self): + """Closes the cursor. + No further operations are allowed once the cursor is closed. + + If the cursor is used in a ``with`` statement, this method will + be automatically called at the end of the ``with`` block. + """ + if self._closed: + raise ProgrammingError('the cursor is already closed') + if self._id is not None: + self._connection._client.close_statement(self._connection._id, self._id) + self._id = None + self._signature = None + self._column_data_types = [] + self._frame = None + self._pos = None + self._closed = True + + @property + def closed(self): + """Read-only attribute specifying if the cursor is closed or not.""" + return self._closed + + @property + def description(self): + if self._signature is None: + return None + description = [] + for column in self._signature.columns: + description.append(ColumnDescription( + column.column_name, + column.type.name, + column.display_size, + None, + column.precision, + column.scale, + None if column.nullable == 2 else bool(column.nullable), + )) + return description + + def _set_id(self, id): + if self._id is not None and self._id != id: + self._connection._client.close_statement(self._connection._id, self._id) + self._id = id + + def _set_signature(self, signature): + self._signature = signature + self._column_data_types = [] + self._parameter_data_types = [] + if signature is None: + return + + for column in signature.columns: + dtype = TypeHelper.from_class(column.column_class_name) + self._column_data_types.append(dtype) + + for parameter in signature.parameters: + dtype = TypeHelper.from_class(parameter.class_name) + self._parameter_data_types.append(dtype) + + def _set_frame(self, frame): + self._frame = frame + self._pos = None + + if frame is not None: + if frame.rows: + self._pos = 0 + elif not frame.done: + raise InternalError('got an empty frame, but the statement is not done yet') + + def _fetch_next_frame(self): + offset = self._frame.offset + len(self._frame.rows) + frame = self._connection._client.fetch( + self._connection._id, self._id, + offset=offset, frame_max_size=self.itersize) + self._set_frame(frame) + + def _process_results(self, results): + if results: + result = results[0] + if result.own_statement: + self._set_id(result.statement_id) + self._set_signature(result.signature if result.HasField('signature') else None) + self._set_frame(result.first_frame if result.HasField('first_frame') else None) + self._updatecount = result.update_count + + def _transform_parameters(self, parameters): + typed_parameters = [] + for value, data_type in zip(parameters, self._parameter_data_types): + field_name, rep, mutate_to, cast_from = data_type + typed_value = common_pb2.TypedValue() + + if value is None: + typed_value.null = True + typed_value.type = common_pb2.NULL + else: + typed_value.null = False + + # use the mutator function + if mutate_to is not None: + value = mutate_to(value) + + typed_value.type = rep + setattr(typed_value, field_name, value) + + typed_parameters.append(typed_value) + return typed_parameters + + def execute(self, operation, parameters=None): + if self._closed: + raise ProgrammingError('the cursor is already closed') + self._updatecount = -1 + self._set_frame(None) + if parameters is None: + if self._id is None: + self._set_id(self._connection._client.create_statement(self._connection._id)) + results = self._connection._client.prepare_and_execute( + self._connection._id, self._id, + operation, first_frame_max_size=self.itersize) + self._process_results(results) + else: + statement = self._connection._client.prepare( + self._connection._id, operation) + self._set_id(statement.id) + self._set_signature(statement.signature) + + results = self._connection._client.execute( + self._connection._id, self._id, + statement.signature, self._transform_parameters(parameters), + first_frame_max_size=self.itersize) + self._process_results(results) + + def executemany(self, operation, seq_of_parameters): + if self._closed: + raise ProgrammingError('the cursor is already closed') + self._updatecount = -1 + self._set_frame(None) + statement = self._connection._client.prepare( + self._connection._id, operation, max_rows_total=0) + self._set_id(statement.id) + self._set_signature(statement.signature) + for parameters in seq_of_parameters: + self._connection._client.execute( + self._connection._id, self._id, + statement.signature, self._transform_parameters(parameters), + first_frame_max_size=0) + + def _transform_row(self, row): + """Transforms a Row into Python values. + + :param row: + A ``common_pb2.Row`` object. + + :returns: + A list of values casted into the correct Python types. + + :raises: + NotImplementedError + """ + tmp_row = [] + + for i, column in enumerate(row.value): + if column.has_array_value: + raise NotImplementedError('array types are not supported') + elif column.scalar_value.null: + tmp_row.append(None) + else: + field_name, rep, mutate_to, cast_from = self._column_data_types[i] + + # get the value from the field_name + value = getattr(column.scalar_value, field_name) + + # cast the value + if cast_from is not None: + value = cast_from(value) + + tmp_row.append(value) + return tmp_row + + def fetchone(self): + if self._frame is None: + raise ProgrammingError('no select statement was executed') + if self._pos is None: + return None + rows = self._frame.rows + row = self._transform_row(rows[self._pos]) + self._pos += 1 + if self._pos >= len(rows): + self._pos = None + if not self._frame.done: + self._fetch_next_frame() + return row + + def fetchmany(self, size=None): + if size is None: + size = self.arraysize + rows = [] + while size > 0: + row = self.fetchone() + if row is None: + break + rows.append(row) + size -= 1 + return rows + + def fetchall(self): + rows = [] + while True: + row = self.fetchone() + if row is None: + break + rows.append(row) + return rows + + def setinputsizes(self, sizes): + pass + + def setoutputsize(self, size, column=None): + pass + + @property + def connection(self): + """Read-only attribute providing access to the :class:`Connection <phoenixdb.connection.Connection>` + object this cursor was created from.""" + return self._connection + + @property + def rowcount(self): + """Read-only attribute specifying the number of rows affected by + the last executed DML statement or -1 if the number cannot be + determined. Note that this will always be set to -1 for select + queries.""" + # TODO instead of -1, this ends up being set to Integer.MAX_VALUE + if self._updatecount == MAX_INT: + return -1 + return self._updatecount + + @property + def rownumber(self): + """Read-only attribute providing the current 0-based index of the + cursor in the result set or ``None`` if the index cannot be + determined. + + The index can be seen as index of the cursor in a sequence + (the result set). The next fetch operation will fetch the + row indexed by :attr:`rownumber` in that sequence. + """ + if self._frame is not None and self._pos is not None: + return self._frame.offset + self._pos + return self._pos + + +class DictCursor(Cursor): + """A cursor which returns results as a dictionary""" + + def _transform_row(self, row): + row = super(DictCursor, self)._transform_row(row) + d = {} + for ind, val in enumerate(row): + d[self._signature.columns[ind].column_name] = val + return d
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/errors.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/errors.py b/python/phoenixdb/errors.py new file mode 100644 index 0000000..a046c0d --- /dev/null +++ b/python/phoenixdb/errors.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [ + 'Warning', 'Error', 'InterfaceError', 'DatabaseError', 'DataError', + 'OperationalError', 'IntegrityError', 'InternalError', + 'ProgrammingError', 'NotSupportedError', +] + +try: + _StandardError = StandardError +except NameError: + _StandardError = Exception + + +class Warning(_StandardError): + """Not used by this package, only defined for compatibility + with DB API 2.0.""" + + +class Error(_StandardError): + """Exception that is the base class of all other error exceptions. + You can use this to catch all errors with one single except statement.""" + + def __init__(self, message, code=None, sqlstate=None, cause=None): + super(_StandardError, self).__init__(message, code, sqlstate, cause) + + @property + def message(self): + return self.args[0] + + @property + def code(self): + return self.args[1] + + @property + def sqlstate(self): + return self.args[2] + + @property + def cause(self): + return self.args[3] + + +class InterfaceError(Error): + """Exception raised for errors that are related to the database + interface rather than the database itself.""" + + +class DatabaseError(Error): + """Exception raised for errors that are related to the database.""" + + +class DataError(DatabaseError): + """Exception raised for errors that are due to problems with the + processed data like division by zero, numeric value out of range, + etc.""" + + +class OperationalError(DatabaseError): + """Raised for errors that are related to the database's operation and not + necessarily under the control of the programmer, e.g. an unexpected + disconnect occurs, the data source name is not found, a transaction could + not be processed, a memory allocation error occurred during + processing, etc.""" + + +class IntegrityError(DatabaseError): + """Raised when the relational integrity of the database is affected, e.g. a foreign key check fails.""" + + +class InternalError(DatabaseError): + """Raised when the database encounters an internal problem.""" + + +class ProgrammingError(DatabaseError): + """Raises for programming errors, e.g. table not found, syntax error, etc.""" + + +class NotSupportedError(DatabaseError): + """Raised when using an API that is not supported by the database.""" http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/__init__.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/__init__.py b/python/phoenixdb/tests/__init__.py new file mode 100644 index 0000000..ec9a79b --- /dev/null +++ b/python/phoenixdb/tests/__init__.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest +import phoenixdb + +TEST_DB_URL = os.environ.get('PHOENIXDB_TEST_DB_URL') + + +@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database") +class DatabaseTestCase(unittest.TestCase): + + def setUp(self): + self.conn = phoenixdb.connect(TEST_DB_URL, autocommit=True) + self.cleanup_tables = [] + + def tearDown(self): + self.doCleanups() + self.conn.close() + + def addTableCleanup(self, name): + def dropTable(): + with self.conn.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS {}".format(name)) + self.addCleanup(dropTable) + + def createTable(self, name, columns): + with self.conn.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS {}".format(name)) + cursor.execute("CREATE TABLE {} ({})".format(name, columns)) + self.addTableCleanup(name) http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/dbapi20.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/dbapi20.py b/python/phoenixdb/tests/dbapi20.py new file mode 100644 index 0000000..f176400 --- /dev/null +++ b/python/phoenixdb/tests/dbapi20.py @@ -0,0 +1,857 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +''' Python DB API 2.0 driver compliance unit test suite. + + This software is Public Domain and may be used without restrictions. + + "Now we have booze and barflies entering the discussion, plus rumours of + DBAs on drugs... and I won't tell you what flashes through my mind each + time I read the subject line with 'Anal Compliance' in it. All around + this is turning out to be a thoroughly unwholesome unit test." + + -- Ian Bicking +''' + +__version__ = '1.14.3' + +import unittest +import time +import sys + +if sys.version[0] >= '3': #python 3.x + _BaseException = Exception + def _failUnless(self, expr, msg=None): + self.assertTrue(expr, msg) +else: #python 2.x + from exceptions import StandardError as _BaseException + def _failUnless(self, expr, msg=None): + self.failUnless(expr, msg) ## deprecated since Python 2.6 + +def str2bytes(sval): + if sys.version_info < (3,0) and isinstance(sval, str): + sval = sval.decode("latin1") + return sval.encode("latin1") #python 3 make unicode into bytes + +class DatabaseAPI20Test(unittest.TestCase): + ''' Test a database self.driver for DB API 2.0 compatibility. + This implementation tests Gadfly, but the TestCase + is structured so that other self.drivers can subclass this + test case to ensure compiliance with the DB-API. It is + expected that this TestCase may be expanded in the future + if ambiguities or edge conditions are discovered. + + The 'Optional Extensions' are not yet being tested. + + self.drivers should subclass this test, overriding setUp, tearDown, + self.driver, connect_args and connect_kw_args. Class specification + should be as follows: + + import dbapi20 + class mytest(dbapi20.DatabaseAPI20Test): + [...] + + Don't 'import DatabaseAPI20Test from dbapi20', or you will + confuse the unit tester - just 'import dbapi20'. + ''' + + # The self.driver module. This should be the module where the 'connect' + # method is to be found + driver = None + connect_args = () # List of arguments to pass to connect + connect_kw_args = {} # Keyword arguments for connect + table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables + + ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix + ddl2 = 'create table %sbarflys (name varchar(20), drink varchar(30))' % table_prefix + xddl1 = 'drop table %sbooze' % table_prefix + xddl2 = 'drop table %sbarflys' % table_prefix + insert = 'insert' + + lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase + + # Some drivers may need to override these helpers, for example adding + # a 'commit' after the execute. + def executeDDL1(self,cursor): + cursor.execute(self.ddl1) + + def executeDDL2(self,cursor): + cursor.execute(self.ddl2) + + def setUp(self): + ''' self.drivers should override this method to perform required setup + if any is necessary, such as creating the database. + ''' + pass + + def tearDown(self): + ''' self.drivers should override this method to perform required cleanup + if any is necessary, such as deleting the test database. + The default drops the tables that may be created. + ''' + try: + con = self._connect() + try: + cur = con.cursor() + for ddl in (self.xddl1,self.xddl2): + try: + cur.execute(ddl) + con.commit() + except self.driver.Error: + # Assume table didn't exist. Other tests will check if + # execute is busted. + pass + finally: + con.close() + except _BaseException: + pass + + def _connect(self): + try: + r = self.driver.connect( + *self.connect_args,**self.connect_kw_args + ) + except AttributeError: + self.fail("No connect method found in self.driver module") + return r + + def test_connect(self): + con = self._connect() + con.close() + + def test_apilevel(self): + try: + # Must exist + apilevel = self.driver.apilevel + # Must equal 2.0 + self.assertEqual(apilevel,'2.0') + except AttributeError: + self.fail("Driver doesn't define apilevel") + + def test_threadsafety(self): + try: + # Must exist + threadsafety = self.driver.threadsafety + # Must be a valid value + _failUnless(self, threadsafety in (0,1,2,3)) + except AttributeError: + self.fail("Driver doesn't define threadsafety") + + def test_paramstyle(self): + try: + # Must exist + paramstyle = self.driver.paramstyle + # Must be a valid value + _failUnless(self, paramstyle in ( + 'qmark','numeric','named','format','pyformat' + )) + except AttributeError: + self.fail("Driver doesn't define paramstyle") + + def test_Exceptions(self): + # Make sure required exceptions exist, and are in the + # defined heirarchy. + if sys.version[0] == '3': #under Python 3 StardardError no longer exists + self.assertTrue(issubclass(self.driver.Warning,Exception)) + self.assertTrue(issubclass(self.driver.Error,Exception)) + else: + self.failUnless(issubclass(self.driver.Warning,StandardError)) + self.failUnless(issubclass(self.driver.Error,StandardError)) + + _failUnless(self, + issubclass(self.driver.InterfaceError,self.driver.Error) + ) + _failUnless(self, + issubclass(self.driver.DatabaseError,self.driver.Error) + ) + _failUnless(self, + issubclass(self.driver.OperationalError,self.driver.Error) + ) + _failUnless(self, + issubclass(self.driver.IntegrityError,self.driver.Error) + ) + _failUnless(self, + issubclass(self.driver.InternalError,self.driver.Error) + ) + _failUnless(self, + issubclass(self.driver.ProgrammingError,self.driver.Error) + ) + _failUnless(self, + issubclass(self.driver.NotSupportedError,self.driver.Error) + ) + + def test_ExceptionsAsConnectionAttributes(self): + # OPTIONAL EXTENSION + # Test for the optional DB API 2.0 extension, where the exceptions + # are exposed as attributes on the Connection object + # I figure this optional extension will be implemented by any + # driver author who is using this test suite, so it is enabled + # by default. + con = self._connect() + drv = self.driver + _failUnless(self,con.Warning is drv.Warning) + _failUnless(self,con.Error is drv.Error) + _failUnless(self,con.InterfaceError is drv.InterfaceError) + _failUnless(self,con.DatabaseError is drv.DatabaseError) + _failUnless(self,con.OperationalError is drv.OperationalError) + _failUnless(self,con.IntegrityError is drv.IntegrityError) + _failUnless(self,con.InternalError is drv.InternalError) + _failUnless(self,con.ProgrammingError is drv.ProgrammingError) + _failUnless(self,con.NotSupportedError is drv.NotSupportedError) + + + def test_commit(self): + con = self._connect() + try: + # Commit must work, even if it doesn't do anything + con.commit() + finally: + con.close() + + def test_rollback(self): + con = self._connect() + # If rollback is defined, it should either work or throw + # the documented exception + if hasattr(con,'rollback'): + try: + con.rollback() + except self.driver.NotSupportedError: + pass + + def test_cursor(self): + con = self._connect() + try: + cur = con.cursor() + finally: + con.close() + + def test_cursor_isolation(self): + con = self._connect() + try: + # Make sure cursors created from the same connection have + # the documented transaction isolation level + cur1 = con.cursor() + cur2 = con.cursor() + self.executeDDL1(cur1) + cur1.execute("%s into %sbooze values ('Victoria Bitter')" % ( + self.insert, self.table_prefix + )) + cur2.execute("select name from %sbooze" % self.table_prefix) + booze = cur2.fetchall() + self.assertEqual(len(booze),1) + self.assertEqual(len(booze[0]),1) + self.assertEqual(booze[0][0],'Victoria Bitter') + finally: + con.close() + + def test_description(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + self.assertEqual(cur.description,None, + 'cursor.description should be none after executing a ' + 'statement that can return no rows (such as DDL)' + ) + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(len(cur.description),1, + 'cursor.description describes too many columns' + ) + self.assertEqual(len(cur.description[0]),7, + 'cursor.description[x] tuples must have 7 elements' + ) + self.assertEqual(cur.description[0][0].lower(),'name', + 'cursor.description[x][0] must return column name' + ) + self.assertEqual(cur.description[0][1],self.driver.STRING, + 'cursor.description[x][1] must return column type. Got %r' + % cur.description[0][1] + ) + + # Make sure self.description gets reset + self.executeDDL2(cur) + self.assertEqual(cur.description,None, + 'cursor.description not being set to None when executing ' + 'no-result statements (eg. DDL)' + ) + finally: + con.close() + + def test_rowcount(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + _failUnless(self,cur.rowcount in (-1,0), # Bug #543885 + 'cursor.rowcount should be -1 or 0 after executing no-result ' + 'statements' + ) + cur.execute("%s into %sbooze values ('Victoria Bitter')" % ( + self.insert, self.table_prefix + )) + _failUnless(self,cur.rowcount in (-1,1), + 'cursor.rowcount should == number or rows inserted, or ' + 'set to -1 after executing an insert statement' + ) + cur.execute("select name from %sbooze" % self.table_prefix) + _failUnless(self,cur.rowcount in (-1,1), + 'cursor.rowcount should == number of rows returned, or ' + 'set to -1 after executing a select statement' + ) + self.executeDDL2(cur) + _failUnless(self,cur.rowcount in (-1,0), # Bug #543885 + 'cursor.rowcount should be -1 or 0 after executing no-result ' + 'statements' + ) + finally: + con.close() + + lower_func = 'lower' + def test_callproc(self): + con = self._connect() + try: + cur = con.cursor() + if self.lower_func and hasattr(cur,'callproc'): + r = cur.callproc(self.lower_func,('FOO',)) + self.assertEqual(len(r),1) + self.assertEqual(r[0],'FOO') + r = cur.fetchall() + self.assertEqual(len(r),1,'callproc produced no result set') + self.assertEqual(len(r[0]),1, + 'callproc produced invalid result set' + ) + self.assertEqual(r[0][0],'foo', + 'callproc produced invalid results' + ) + finally: + con.close() + + def test_close(self): + con = self._connect() + try: + cur = con.cursor() + finally: + con.close() + + # cursor.execute should raise an Error if called after connection + # closed + self.assertRaises(self.driver.Error,self.executeDDL1,cur) + + # connection.commit should raise an Error if called after connection' + # closed.' + self.assertRaises(self.driver.Error,con.commit) + + def test_non_idempotent_close(self): + con = self._connect() + con.close() + # connection.close should raise an Error if called more than once + #!!! reasonable persons differ about the usefulness of this test and this feature !!! + self.assertRaises(self.driver.Error,con.close) + + def test_execute(self): + con = self._connect() + try: + cur = con.cursor() + self._paraminsert(cur) + finally: + con.close() + + def _paraminsert(self,cur): + self.executeDDL2(cur) + cur.execute("%s into %sbarflys values ('Victoria Bitter', 'thi%%s :may ca%%(u)se? troub:1e')" % ( + self.insert, self.table_prefix + )) + _failUnless(self,cur.rowcount in (-1,1)) + + if self.driver.paramstyle == 'qmark': + cur.execute( + "%s into %sbarflys values (?, 'thi%%s :may ca%%(u)se? troub:1e')" % (self.insert, self.table_prefix), + ("Cooper's",) + ) + elif self.driver.paramstyle == 'numeric': + cur.execute( + "%s into %sbarflys values (:1, 'thi%%s :may ca%%(u)se? troub:1e')" % (self.insert, self.table_prefix), + ("Cooper's",) + ) + elif self.driver.paramstyle == 'named': + cur.execute( + "%s into %sbarflys values (:beer, 'thi%%s :may ca%%(u)se? troub:1e')" % (self.insert, self.table_prefix), + {'beer':"Cooper's"} + ) + elif self.driver.paramstyle == 'format': + cur.execute( + "%s into %sbarflys values (%%s, 'thi%%%%s :may ca%%%%(u)se? troub:1e')" % (self.insert, self.table_prefix), + ("Cooper's",) + ) + elif self.driver.paramstyle == 'pyformat': + cur.execute( + "%s into %sbarflys values (%%(beer)s, 'thi%%%%s :may ca%%%%(u)se? troub:1e')" % (self.insert, self.table_prefix), + {'beer':"Cooper's"} + ) + else: + self.fail('Invalid paramstyle') + _failUnless(self,cur.rowcount in (-1,1)) + + cur.execute('select name, drink from %sbarflys' % self.table_prefix) + res = cur.fetchall() + self.assertEqual(len(res),2,'cursor.fetchall returned too few rows') + beers = [res[0][0],res[1][0]] + beers.sort() + self.assertEqual(beers[0],"Cooper's", + 'cursor.fetchall retrieved incorrect data, or data inserted ' + 'incorrectly' + ) + self.assertEqual(beers[1],"Victoria Bitter", + 'cursor.fetchall retrieved incorrect data, or data inserted ' + 'incorrectly' + ) + trouble = "thi%s :may ca%(u)se? troub:1e" + self.assertEqual(res[0][1], trouble, + 'cursor.fetchall retrieved incorrect data, or data inserted ' + 'incorrectly. Got=%s, Expected=%s' % (repr(res[0][1]), repr(trouble))) + self.assertEqual(res[1][1], trouble, + 'cursor.fetchall retrieved incorrect data, or data inserted ' + 'incorrectly. Got=%s, Expected=%s' % (repr(res[1][1]), repr(trouble) + )) + + def test_executemany(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + largs = [ ("Cooper's",) , ("Boag's",) ] + margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ] + if self.driver.paramstyle == 'qmark': + cur.executemany( + '%s into %sbooze values (?)' % (self.insert, self.table_prefix), + largs + ) + elif self.driver.paramstyle == 'numeric': + cur.executemany( + '%s into %sbooze values (:1)' % (self.insert, self.table_prefix), + largs + ) + elif self.driver.paramstyle == 'named': + cur.executemany( + '%s into %sbooze values (:beer)' % (self.insert, self.table_prefix), + margs + ) + elif self.driver.paramstyle == 'format': + cur.executemany( + '%s into %sbooze values (%%s)' % (self.insert, self.table_prefix), + largs + ) + elif self.driver.paramstyle == 'pyformat': + cur.executemany( + '%s into %sbooze values (%%(beer)s)' % ( + self.insert, self.table_prefix + ), + margs + ) + else: + self.fail('Unknown paramstyle') + _failUnless(self,cur.rowcount in (-1,2), + 'insert using cursor.executemany set cursor.rowcount to ' + 'incorrect value %r' % cur.rowcount + ) + cur.execute('select name from %sbooze' % self.table_prefix) + res = cur.fetchall() + self.assertEqual(len(res),2, + 'cursor.fetchall retrieved incorrect number of rows' + ) + beers = [res[0][0],res[1][0]] + beers.sort() + self.assertEqual(beers[0],"Boag's",'incorrect data retrieved') + self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved') + finally: + con.close() + + def test_fetchone(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchone should raise an Error if called before + # executing a select-type query + self.assertRaises(self.driver.Error,cur.fetchone) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + self.executeDDL1(cur) + self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if a query retrieves ' + 'no rows' + ) + _failUnless(self,cur.rowcount in (-1,0)) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + cur.execute("%s into %sbooze values ('Victoria Bitter')" % ( + self.insert, self.table_prefix + )) + self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchone() + self.assertEqual(len(r),1, + 'cursor.fetchone should have retrieved a single row' + ) + self.assertEqual(r[0],'Victoria Bitter', + 'cursor.fetchone retrieved incorrect data' + ) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if no more rows available' + ) + _failUnless(self,cur.rowcount in (-1,1)) + finally: + con.close() + + samples = [ + 'Carlton Cold', + 'Carlton Draft', + 'Mountain Goat', + 'Redback', + 'Victoria Bitter', + 'XXXX' + ] + + def _populate(self): + ''' Return a list of sql commands to setup the DB for the fetch + tests. + ''' + populate = [ + "%s into %sbooze values ('%s')" % (self.insert, self.table_prefix, s) + for s in self.samples + ] + return populate + + def test_fetchmany(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchmany should raise an Error if called without + #issuing a query + self.assertRaises(self.driver.Error,cur.fetchmany,4) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchmany() + self.assertEqual(len(r),1, + 'cursor.fetchmany retrieved incorrect number of rows, ' + 'default of arraysize is one.' + ) + cur.arraysize=10 + r = cur.fetchmany(3) # Should get 3 rows + self.assertEqual(len(r),3, + 'cursor.fetchmany retrieved incorrect number of rows' + ) + r = cur.fetchmany(4) # Should get 2 more + self.assertEqual(len(r),2, + 'cursor.fetchmany retrieved incorrect number of rows' + ) + r = cur.fetchmany(4) # Should be an empty sequence + self.assertEqual(len(r),0, + 'cursor.fetchmany should return an empty sequence after ' + 'results are exhausted' + ) + _failUnless(self,cur.rowcount in (-1,6)) + + # Same as above, using cursor.arraysize + cur.arraysize=4 + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchmany() # Should get 4 rows + self.assertEqual(len(r),4, + 'cursor.arraysize not being honoured by fetchmany' + ) + r = cur.fetchmany() # Should get 2 more + self.assertEqual(len(r),2) + r = cur.fetchmany() # Should be an empty sequence + self.assertEqual(len(r),0) + _failUnless(self,cur.rowcount in (-1,6)) + + cur.arraysize=6 + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchmany() # Should get all rows + _failUnless(self,cur.rowcount in (-1,6)) + self.assertEqual(len(rows),6) + self.assertEqual(len(rows),6) + rows = [r[0] for r in rows] + rows.sort() + + # Make sure we get the right data back out + for i in range(0,6): + self.assertEqual(rows[i],self.samples[i], + 'incorrect data retrieved by cursor.fetchmany' + ) + + rows = cur.fetchmany() # Should return an empty list + self.assertEqual(len(rows),0, + 'cursor.fetchmany should return an empty sequence if ' + 'called after the whole result set has been fetched' + ) + _failUnless(self,cur.rowcount in (-1,6)) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + r = cur.fetchmany() # Should get empty sequence + self.assertEqual(len(r),0, + 'cursor.fetchmany should return an empty sequence if ' + 'query retrieved no rows' + ) + _failUnless(self,cur.rowcount in (-1,0)) + + finally: + con.close() + + def test_fetchall(self): + con = self._connect() + try: + cur = con.cursor() + # cursor.fetchall should raise an Error if called + # without executing a query that may return rows (such + # as a select) + self.assertRaises(self.driver.Error, cur.fetchall) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + # cursor.fetchall should raise an Error if called + # after executing a a statement that cannot return rows + self.assertRaises(self.driver.Error,cur.fetchall) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchall() + _failUnless(self,cur.rowcount in (-1,len(self.samples))) + self.assertEqual(len(rows),len(self.samples), + 'cursor.fetchall did not retrieve all rows' + ) + rows = [r[0] for r in rows] + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'cursor.fetchall retrieved incorrect rows' + ) + rows = cur.fetchall() + self.assertEqual( + len(rows),0, + 'cursor.fetchall should return an empty list if called ' + 'after the whole result set has been fetched' + ) + _failUnless(self,cur.rowcount in (-1,len(self.samples))) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + rows = cur.fetchall() + _failUnless(self,cur.rowcount in (-1,0)) + self.assertEqual(len(rows),0, + 'cursor.fetchall should return an empty list if ' + 'a select query returns no rows' + ) + + finally: + con.close() + + def test_mixedfetch(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows1 = cur.fetchone() + rows23 = cur.fetchmany(2) + rows4 = cur.fetchone() + rows56 = cur.fetchall() + _failUnless(self,cur.rowcount in (-1,6)) + self.assertEqual(len(rows23),2, + 'fetchmany returned incorrect number of rows' + ) + self.assertEqual(len(rows56),2, + 'fetchall returned incorrect number of rows' + ) + + rows = [rows1[0]] + rows.extend([rows23[0][0],rows23[1][0]]) + rows.append(rows4[0]) + rows.extend([rows56[0][0],rows56[1][0]]) + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'incorrect data retrieved or inserted' + ) + finally: + con.close() + + def help_nextset_setUp(self,cur): + ''' Should create a procedure called deleteme + that returns two result sets, first the + number of rows in booze then "name from booze" + ''' + raise NotImplementedError('Helper not implemented') + #sql=""" + # create procedure deleteme as + # begin + # select count(*) from booze + # select name from booze + # end + #""" + #cur.execute(sql) + + def help_nextset_tearDown(self,cur): + 'If cleaning up is needed after nextSetTest' + raise NotImplementedError('Helper not implemented') + #cur.execute("drop procedure deleteme") + + def test_nextset(self): + con = self._connect() + try: + cur = con.cursor() + if not hasattr(cur,'nextset'): + return + + try: + self.executeDDL1(cur) + sql=self._populate() + for sql in self._populate(): + cur.execute(sql) + + self.help_nextset_setUp(cur) + + cur.callproc('deleteme') + numberofrows=cur.fetchone() + assert numberofrows[0]== len(self.samples) + assert cur.nextset() + names=cur.fetchall() + assert len(names) == len(self.samples) + s=cur.nextset() + assert s == None,'No more return sets, should return None' + finally: + self.help_nextset_tearDown(cur) + + finally: + con.close() + + def test_nextset(self): + raise NotImplementedError('Drivers need to override this test') + + def test_arraysize(self): + # Not much here - rest of the tests for this are in test_fetchmany + con = self._connect() + try: + cur = con.cursor() + _failUnless(self,hasattr(cur,'arraysize'), + 'cursor.arraysize must be defined' + ) + finally: + con.close() + + def test_setinputsizes(self): + con = self._connect() + try: + cur = con.cursor() + cur.setinputsizes( (25,) ) + self._paraminsert(cur) # Make sure cursor still works + finally: + con.close() + + def test_setoutputsize_basic(self): + # Basic test is to make sure setoutputsize doesn't blow up + con = self._connect() + try: + cur = con.cursor() + cur.setoutputsize(1000) + cur.setoutputsize(2000,0) + self._paraminsert(cur) # Make sure the cursor still works + finally: + con.close() + + def test_setoutputsize(self): + # Real test for setoutputsize is driver dependant + raise NotImplementedError('Driver needed to override this test') + + def test_None(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + cur.execute("%s into %sbarflys values ('a', NULL)" % (self.insert, self.table_prefix)) + cur.execute('select drink from %sbarflys' % self.table_prefix) + r = cur.fetchall() + self.assertEqual(len(r),1) + self.assertEqual(len(r[0]),1) + self.assertEqual(r[0][0],None,'NULL value not returned as None') + finally: + con.close() + + def test_Date(self): + d1 = self.driver.Date(2002,12,25) + d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0))) + # Can we assume this? API doesn't specify, but it seems implied + # self.assertEqual(str(d1),str(d2)) + + def test_Time(self): + t1 = self.driver.Time(13,45,30) + t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0))) + # Can we assume this? API doesn't specify, but it seems implied + # self.assertEqual(str(t1),str(t2)) + + def test_Timestamp(self): + t1 = self.driver.Timestamp(2002,12,25,13,45,30) + t2 = self.driver.TimestampFromTicks( + time.mktime((2002,12,25,13,45,30,0,0,0)) + ) + # Can we assume this? API doesn't specify, but it seems implied + # self.assertEqual(str(t1),str(t2)) + + def test_Binary(self): + b = self.driver.Binary(str2bytes('Something')) + b = self.driver.Binary(str2bytes('')) + + def test_STRING(self): + _failUnless(self, hasattr(self.driver,'STRING'), + 'module.STRING must be defined' + ) + + def test_BINARY(self): + _failUnless(self, hasattr(self.driver,'BINARY'), + 'module.BINARY must be defined.' + ) + + def test_NUMBER(self): + _failUnless(self, hasattr(self.driver,'NUMBER'), + 'module.NUMBER must be defined.' + ) + + def test_DATETIME(self): + _failUnless(self, hasattr(self.driver,'DATETIME'), + 'module.DATETIME must be defined.' + ) + + def test_ROWID(self): + _failUnless(self, hasattr(self.driver,'ROWID'), + 'module.ROWID must be defined.' + ) http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/test_avatica.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/test_avatica.py b/python/phoenixdb/tests/test_avatica.py new file mode 100644 index 0000000..6152814 --- /dev/null +++ b/python/phoenixdb/tests/test_avatica.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from phoenixdb.avatica.client import parse_url, urlparse + + +class ParseUrlTest(unittest.TestCase): + + def test_parse_url(self): + self.assertEqual(urlparse.urlparse('http://localhost:8765/'), parse_url('localhost')) + self.assertEqual(urlparse.urlparse('http://localhost:2222/'), parse_url('localhost:2222')) + self.assertEqual(urlparse.urlparse('http://localhost:2222/'), parse_url('http://localhost:2222/')) http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/test_connection.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/test_connection.py b/python/phoenixdb/tests/test_connection.py new file mode 100644 index 0000000..2deacf5 --- /dev/null +++ b/python/phoenixdb/tests/test_connection.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import phoenixdb +from phoenixdb.tests import TEST_DB_URL + + +@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database") +class PhoenixConnectionTest(unittest.TestCase): + + def _connect(self, connect_kw_args): + try: + r = phoenixdb.connect(TEST_DB_URL, **connect_kw_args) + except AttributeError: + self.fail("Failed to connect") + return r + + def test_connection_credentials(self): + connect_kw_args = {'user': 'SCOTT', 'password': 'TIGER', 'readonly': 'True'} + con = self._connect(connect_kw_args) + try: + self.assertEqual( + con._connection_args, {'user': 'SCOTT', 'password': 'TIGER'}, + 'Should have extract user and password') + self.assertEqual( + con._filtered_args, {'readonly': 'True'}, + 'Should have not extracted foo') + finally: + con.close() http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/test_db.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/test_db.py b/python/phoenixdb/tests/test_db.py new file mode 100644 index 0000000..2fb1a2a --- /dev/null +++ b/python/phoenixdb/tests/test_db.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import phoenixdb +import phoenixdb.cursor +from phoenixdb.errors import InternalError +from phoenixdb.tests import TEST_DB_URL + + +@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database") +class PhoenixDatabaseTest(unittest.TestCase): + + def test_select_literal(self): + db = phoenixdb.connect(TEST_DB_URL, autocommit=True) + self.addCleanup(db.close) + + with db.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text VARCHAR)") + cursor.executemany("UPSERT INTO test VALUES (?, ?)", [[i, 'text {}'.format(i)] for i in range(10)]) + + with db.cursor() as cursor: + cursor.itersize = 4 + cursor.execute("SELECT * FROM test WHERE id>1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[i, 'text {}'.format(i)] for i in range(2, 10)]) + + def test_select_parameter(self): + db = phoenixdb.connect(TEST_DB_URL, autocommit=True) + self.addCleanup(db.close) + + with db.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text VARCHAR)") + cursor.executemany("UPSERT INTO test VALUES (?, ?)", [[i, 'text {}'.format(i)] for i in range(10)]) + + with db.cursor() as cursor: + cursor.itersize = 4 + cursor.execute("SELECT * FROM test WHERE id>? ORDER BY id", [1]) + self.assertEqual(cursor.fetchall(), [[i, 'text {}'.format(i)] for i in range(2, 10)]) + + def _check_dict_cursor(self, cursor): + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text VARCHAR)") + cursor.execute("UPSERT INTO test VALUES (?, ?)", [1, 'text 1']) + cursor.execute("SELECT * FROM test ORDER BY id") + self.assertEqual(cursor.fetchall(), [{'ID': 1, 'TEXT': 'text 1'}]) + + def test_dict_cursor_default_parameter(self): + db = phoenixdb.connect(TEST_DB_URL, autocommit=True, cursor_factory=phoenixdb.cursor.DictCursor) + self.addCleanup(db.close) + + with db.cursor() as cursor: + self._check_dict_cursor(cursor) + + def test_dict_cursor_default_attribute(self): + db = phoenixdb.connect(TEST_DB_URL, autocommit=True) + db.cursor_factory = phoenixdb.cursor.DictCursor + self.addCleanup(db.close) + + with db.cursor() as cursor: + self._check_dict_cursor(cursor) + + def test_dict_cursor(self): + db = phoenixdb.connect(TEST_DB_URL, autocommit=True) + self.addCleanup(db.close) + + with db.cursor(cursor_factory=phoenixdb.cursor.DictCursor) as cursor: + self._check_dict_cursor(cursor) + + def test_schema(self): + db = phoenixdb.connect(TEST_DB_URL, autocommit=True) + self.addCleanup(db.close) + + with db.cursor() as cursor: + try: + cursor.execute("CREATE SCHEMA IF NOT EXISTS test_schema") + except InternalError as e: + if "phoenix.schema.isNamespaceMappingEnabled" in e.message: + self.skipTest(e.message) + raise + + cursor.execute("DROP TABLE IF EXISTS test_schema.test") + cursor.execute("CREATE TABLE test_schema.test (id INTEGER PRIMARY KEY, text VARCHAR)") + cursor.execute("UPSERT INTO test_schema.test VALUES (?, ?)", [1, 'text 1']) + cursor.execute("SELECT * FROM test_schema.test ORDER BY id") + self.assertEqual(cursor.fetchall(), [[1, 'text 1']]) http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/test_dbapi20.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/test_dbapi20.py b/python/phoenixdb/tests/test_dbapi20.py new file mode 100644 index 0000000..0e5c2e4 --- /dev/null +++ b/python/phoenixdb/tests/test_dbapi20.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import phoenixdb +from . import dbapi20 +from phoenixdb.tests import TEST_DB_URL + + +@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database") +class PhoenixDatabaseAPI20Test(dbapi20.DatabaseAPI20Test): + driver = phoenixdb + connect_args = (TEST_DB_URL, ) + + ddl1 = 'create table %sbooze (name varchar(20) primary key)' % dbapi20.DatabaseAPI20Test.table_prefix + ddl2 = 'create table %sbarflys (name varchar(20) primary key, drink varchar(30))' % dbapi20.DatabaseAPI20Test.table_prefix + insert = 'upsert' + + def test_nextset(self): + pass + + def test_setoutputsize(self): + pass + + def _connect(self): + con = dbapi20.DatabaseAPI20Test._connect(self) + con.autocommit = True + return con + + def test_None(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL2(cur) + cur.execute("%s into %sbarflys values ('a', NULL)" % (self.insert, self.table_prefix)) + cur.execute('select drink from %sbarflys' % self.table_prefix) + r = cur.fetchall() + self.assertEqual(len(r), 1) + self.assertEqual(len(r[0]), 1) + self.assertEqual(r[0][0], None, 'NULL value not returned as None') + finally: + con.close() + + def test_autocommit(self): + con = dbapi20.DatabaseAPI20Test._connect(self) + self.assertFalse(con.autocommit) + con.autocommit = True + self.assertTrue(con.autocommit) + con.autocommit = False + self.assertFalse(con.autocommit) + con.close() + + def test_readonly(self): + con = dbapi20.DatabaseAPI20Test._connect(self) + self.assertFalse(con.readonly) + con.readonly = True + self.assertTrue(con.readonly) + con.readonly = False + self.assertFalse(con.readonly) + con.close() + + def test_iter(self): + # https://www.python.org/dev/peps/pep-0249/#iter + con = self._connect() + try: + cur = con.cursor() + if hasattr(cur, '__iter__'): + self.assertIs(cur, iter(cur)) + finally: + con.close() + + def test_next(self): + # https://www.python.org/dev/peps/pep-0249/#next + con = self._connect() + try: + cur = con.cursor() + if not hasattr(cur, 'next'): + return + + # cursor.next should raise an Error if called before + # executing a select-type query + self.assertRaises(self.driver.Error, cur.next) + + # cursor.next should raise an Error if called after + # executing a query that cannnot return rows + self.executeDDL1(cur) + self.assertRaises(self.driver.Error, cur.next) + + # cursor.next should return None if a query retrieves ' + # no rows + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertRaises(StopIteration, cur.next) + self.failUnless(cur.rowcount in (-1, 0)) + + # cursor.next should raise an Error if called after + # executing a query that cannnot return rows + cur.execute("%s into %sbooze values ('Victoria Bitter')" % ( + self.insert, self.table_prefix + )) + self.assertRaises(self.driver.Error, cur.next) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.next() + self.assertEqual(len(r), 1, 'cursor.next should have retrieved a row with one column') + self.assertEqual(r[0], 'Victoria Bitter', 'cursor.next retrieved incorrect data') + # cursor.next should raise StopIteration if no more rows available + self.assertRaises(StopIteration, cur.next) + self.failUnless(cur.rowcount in (-1, 1)) + finally: + con.close() http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/test_errors.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/test_errors.py b/python/phoenixdb/tests/test_errors.py new file mode 100644 index 0000000..191ccb1 --- /dev/null +++ b/python/phoenixdb/tests/test_errors.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from phoenixdb.tests import DatabaseTestCase + + +class ProgrammingErrorTest(DatabaseTestCase): + + def test_invalid_sql(self): + with self.conn.cursor() as cursor: + with self.assertRaises(self.conn.ProgrammingError) as cm: + cursor.execute("UPS") + self.assertEqual("Syntax error. Encountered \"UPS\" at line 1, column 1.", cm.exception.message) + self.assertEqual(601, cm.exception.code) + self.assertEqual("42P00", cm.exception.sqlstate) + + +class IntegrityErrorTest(DatabaseTestCase): + + def test_null_in_pk(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key") + with self.conn.cursor() as cursor: + with self.assertRaises(self.conn.IntegrityError) as cm: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (NULL)") + self.assertEqual("Constraint violation. PHOENIXDB_TEST_TBL1.ID may not be null", cm.exception.message) + self.assertEqual(218, cm.exception.code) + self.assertIn(cm.exception.sqlstate, ("22018", "23018")) + + +class DataErrorTest(DatabaseTestCase): + + def test_number_outside_of_range(self): + self.createTable("phoenixdb_test_tbl1", "id tinyint primary key") + with self.conn.cursor() as cursor: + with self.assertRaises(self.conn.DataError) as cm: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (10000)") + self.assertEqual("Type mismatch. TINYINT and INTEGER for 10000", cm.exception.message) + self.assertEqual(203, cm.exception.code) + self.assertEqual("22005", cm.exception.sqlstate) + + def test_division_by_zero(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key") + with self.conn.cursor() as cursor: + with self.assertRaises(self.conn.DataError) as cm: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2/0)") + self.assertEqual("Divide by zero.", cm.exception.message) + self.assertEqual(202, cm.exception.code) + self.assertEqual("22012", cm.exception.sqlstate) http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b72808b/python/phoenixdb/tests/test_types.py ---------------------------------------------------------------------- diff --git a/python/phoenixdb/tests/test_types.py b/python/phoenixdb/tests/test_types.py new file mode 100644 index 0000000..2cef0f2 --- /dev/null +++ b/python/phoenixdb/tests/test_types.py @@ -0,0 +1,327 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +import datetime +import phoenixdb +from decimal import Decimal +from phoenixdb.tests import DatabaseTestCase + + +class TypesTest(DatabaseTestCase): + + def checkIntType(self, type_name, min_value, max_value): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val {}".format(type_name)) + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 1)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [1]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [min_value]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [max_value]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER) + self.assertEqual(cursor.fetchall(), [[1, 1], [2, None], [3, 1], [4, None], [5, min_value], [6, max_value]]) + + self.assertRaises( + self.conn.DatabaseError, cursor.execute, + "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, {})".format(min_value - 1)) + + self.assertRaises( + self.conn.DatabaseError, cursor.execute, + "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, {})".format(max_value + 1)) + + # XXX The server silently truncates the values +# self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [min_value - 1]) +# self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [max_value + 1]) + + def test_integer(self): + self.checkIntType("integer", -2147483648, 2147483647) + + def test_unsigned_int(self): + self.checkIntType("unsigned_int", 0, 2147483647) + + def test_bigint(self): + self.checkIntType("bigint", -9223372036854775808, 9223372036854775807) + + def test_unsigned_long(self): + self.checkIntType("unsigned_long", 0, 9223372036854775807) + + def test_tinyint(self): + self.checkIntType("tinyint", -128, 127) + + def test_unsigned_tinyint(self): + self.checkIntType("unsigned_tinyint", 0, 127) + + def test_smallint(self): + self.checkIntType("smallint", -32768, 32767) + + def test_unsigned_smallint(self): + self.checkIntType("unsigned_smallint", 0, 32767) + + def checkFloatType(self, type_name, min_value, max_value): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val {}".format(type_name)) + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 1)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [1]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [min_value]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [max_value]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER) + rows = cursor.fetchall() + self.assertEqual([r[0] for r in rows], [1, 2, 3, 4, 5, 6]) + self.assertEqual(rows[0][1], 1.0) + self.assertEqual(rows[1][1], None) + self.assertEqual(rows[2][1], 1.0) + self.assertEqual(rows[3][1], None) + self.assertAlmostEqual(rows[4][1], min_value) + self.assertAlmostEqual(rows[5][1], max_value) + + def test_float(self): + self.checkFloatType("float", -3.4028234663852886e+38, 3.4028234663852886e+38) + + def test_unsigned_float(self): + self.checkFloatType("unsigned_float", 0, 3.4028234663852886e+38) + + def test_double(self): + self.checkFloatType("double", -1.7976931348623158E+308, 1.7976931348623158E+308) + + def test_unsigned_double(self): + self.checkFloatType("unsigned_double", 0, 1.7976931348623158E+308) + + def test_decimal(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val decimal(8,3)") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 33333.333)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [33333.333]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [Decimal('33333.333')]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [None]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER) + rows = cursor.fetchall() + self.assertEqual([r[0] for r in rows], [1, 2, 3, 4, 5]) + self.assertEqual(rows[0][1], Decimal('33333.333')) + self.assertEqual(rows[1][1], None) + self.assertEqual(rows[2][1], Decimal('33333.333')) + self.assertEqual(rows[3][1], Decimal('33333.333')) + self.assertEqual(rows[4][1], None) + self.assertRaises( + self.conn.DatabaseError, cursor.execute, + "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [Decimal('1234567890')]) + self.assertRaises( + self.conn.DatabaseError, cursor.execute, + "UPSERT INTO phoenixdb_test_tbl1 VALUES (101, ?)", [Decimal('123456.789')]) + + def test_boolean(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val boolean") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, TRUE)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, FALSE)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [True]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [False]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [None]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.description[1].type_code, phoenixdb.BOOLEAN) + self.assertEqual(cursor.fetchall(), [[1, True], [2, False], [3, None], [4, True], [5, False], [6, None]]) + + def test_time(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val time") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '1970-01-01 12:01:02')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [phoenixdb.Time(12, 1, 2)]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [datetime.time(12, 1, 2)]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [None]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, datetime.time(12, 1, 2)], + [2, None], + [3, datetime.time(12, 1, 2)], + [4, datetime.time(12, 1, 2)], + [5, None], + ]) + + @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-797") + def test_time_full(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val time") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)], + [2, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)], + ]) + + def test_date(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val date") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 00:00:00')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [phoenixdb.Date(2015, 7, 12)]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [datetime.date(2015, 7, 12)]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, datetime.date(2015, 7, 12)], + [3, datetime.date(2015, 7, 12)], + [4, datetime.date(2015, 7, 12)], + ]) + + @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-798") + def test_date_full(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val date") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)], + [2, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)], + ]) + + def test_date_null(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val date") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [None]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") # raises NullPointerException on the server + self.assertEqual(cursor.fetchall(), [ + [1, None], + [2, None], + ]) + + def test_timestamp(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val timestamp") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [phoenixdb.Timestamp(2015, 7, 12, 13, 1, 2)]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [None]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)], + [2, None], + [3, datetime.datetime(2015, 7, 12, 13, 1, 2)], + [4, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)], + [5, None], + ]) + + @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-796") + def test_timestamp_full(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val timestamp") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123456789')") + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123456789)], + ]) + + def test_varchar(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'abc')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", ['abc']) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, '')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", ['']) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[1, 'abc'], [2, None], [3, 'abc'], [4, None], [5, None], [6, None]]) + + def test_varchar_very_long(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar") + with self.conn.cursor() as cursor: + value = '1234567890' * 1000 + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ?)", [value]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[1, value]]) + + def test_varchar_limited(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar(2)") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", ['ab']) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, '')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", ['']) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[1, 'ab'], [2, None], [3, 'ab'], [4, None], [5, None], [6, None]]) + self.assertRaises(self.conn.DataError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, 'abc')") + + def test_char_null(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val char(2)") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, '')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", ['']) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[2, None], [4, None], [5, None], [6, None]]) + self.assertRaises(self.conn.DataError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, 'abc')") + + def test_char(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val char(2)") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", ['ab']) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, 'a')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", ['b']) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[1, 'ab'], [2, 'ab'], [3, 'a'], [4, 'b']]) + self.assertRaises(self.conn.DataError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, 'abc')") + + def test_binary(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(2)") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [phoenixdb.Binary(b'ab')]) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, '\x01\x00')") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [phoenixdb.Binary(b'\x01\x00')]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, b'ab'], + [2, b'ab'], + [3, b'\x01\x00'], + [4, b'\x01\x00'], + ]) + + def test_binary_all_bytes(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(256)") + with self.conn.cursor() as cursor: + if sys.version_info[0] < 3: + value = ''.join(map(chr, range(256))) + else: + value = bytes(range(256)) + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ?)", [phoenixdb.Binary(value)]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [[1, value]]) + + @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-1050 https://issues.apache.org/jira/browse/PHOENIX-2585") + def test_array(self): + self.createTable("phoenixdb_test_tbl1", "id integer primary key, val integer[]") + with self.conn.cursor() as cursor: + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ARRAY[1, 2])") + cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [[2, 3]]) + cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") + self.assertEqual(cursor.fetchall(), [ + [1, [1, 2]], + [2, [2, 3]], + ])