This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-queryserver.git
The following commit(s) were added to refs/heads/master by this push: new 5b61a18 PHOENIX-6410 Expose primary keys metadata via SqlAlchemy 5b61a18 is described below commit 5b61a18c523e3cceaddf2dd0742df3f804c3047d Author: Istvan Toth <st...@apache.org> AuthorDate: Fri May 14 20:59:46 2021 +0200 PHOENIX-6410 Expose primary keys metadata via SqlAlchemy --- python-phoenixdb/README.rst | 10 +- python-phoenixdb/phoenixdb/avatica/client.py | 10 ++ python-phoenixdb/phoenixdb/cursor.py | 15 +++ python-phoenixdb/phoenixdb/meta.py | 114 +++++++++++++++++++++ python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py | 46 +++++++-- python-phoenixdb/phoenixdb/tests/test_db.py | 61 ++++++++++- .../phoenixdb/tests/test_sqlalchemy.py | 22 +++- 7 files changed, 261 insertions(+), 17 deletions(-) diff --git a/python-phoenixdb/README.rst b/python-phoenixdb/README.rst index a7a32f6..2f1a562 100644 --- a/python-phoenixdb/README.rst +++ b/python-phoenixdb/README.rst @@ -58,14 +58,14 @@ necessary requirements:: python setup.py develop You can start a Phoenix QueryServer instance on http://localhost:8765 for testing by running -the following command in the phoenix-queryserver directory:: +the following command in the pohoenix-queryserver-parent directory:: mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \ -Dit.test=QueryServerBasicsIT\#startLocalPQS \ -Ddo.not.randomize.pqs.port=true -Dstart.unsecure.pqs=true You can start a secure (https+kerberos) Phoenix QueryServer instance on https://localhost:8765 -for testing by running the following command in the phoenix-queryserver directory:: +for testing by running the following command in the phoenix-queryserver-parent directory:: mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \ -Dit.test=SecureQueryServerPhoenixDBIT\#startLocalPQS \ @@ -77,7 +77,7 @@ up the environment for the tests. If you want to use the library without installing the phoenixdb library, you can use the `PYTHONPATH` environment variable to point to the library directly:: - cd $PHOENIX_HOME/python + cd phoenix-queryserver-parent/python-phoenixdb python setup.py build cd ~/my_project PYTHONPATH=$PHOENIX_HOME/build/lib python my_app.py @@ -110,8 +110,8 @@ Similarly, tox can be used to run the test suite against multiple Python version pyenv global 2.7.14 3.5.5 3.6.4 PHOENIXDB_TEST_DB_URL='http://localhost:8765' tox -You can use tox and docker to run the tests on all supported python versions without installing the -environments locally:: +You can use tox and docker to run the tests on supported python versions up to 3.8 without +installing the environments locally:: docker build -t toxtest . docker run --rm -v `pwd`:/src toxtest diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py index d6f42fe..48b6406 100644 --- a/python-phoenixdb/phoenixdb/avatica/client.py +++ b/python-phoenixdb/phoenixdb/avatica/client.py @@ -310,6 +310,16 @@ class AvaticaClient(object): response.ParseFromString(response_data) return response + def get_sync_results(self, connection_id, statement_id, state): + request = requests_pb2.SyncResultsRequest() + request.connection_id = connection_id + request.statement_id = statement_id + request.state.CopyFrom(state) + response_data = self._apply(request, 'SyncResultsResponse') + syncResultResponse = responses_pb2.SyncResultsResponse() + syncResultResponse.ParseFromString(response_data) + return syncResultResponse + def connection_sync_dict(self, connection_id, connProps=None): conn_props = self.connection_sync(connection_id, connProps) return { diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py index ad09106..e716115 100644 --- a/python-phoenixdb/phoenixdb/cursor.py +++ b/python-phoenixdb/phoenixdb/cursor.py @@ -246,6 +246,21 @@ class Cursor(object): self._connection._id, self._id, [self._transform_parameters(p) for p in seq_of_parameters]) + def get_sync_results(self, state): + if self._closed: + raise ProgrammingError('The cursor is already closed.') + if self._id is None: + self._set_id(self._connection._client.create_statement(self._connection._id)) + return self._connection._client.get_sync_results(self._connection._id, self._id, state) + + def fetch(self, signature): + if self._closed: + raise ProgrammingError('The cursor is already closed.') + self._updatecount = -1 + self._set_signature(signature) + frame = self._connection._client.fetch(self._connection._id, self._id, 0, self.itersize) + self._set_frame(frame) + def _transform_row(self, row): """Transforms a Row into Python values. diff --git a/python-phoenixdb/phoenixdb/meta.py b/python-phoenixdb/phoenixdb/meta.py index 18ad147..d5987bb 100644 --- a/python-phoenixdb/phoenixdb/meta.py +++ b/python-phoenixdb/phoenixdb/meta.py @@ -16,6 +16,7 @@ import sys import logging +from phoenixdb.avatica.proto import common_pb2 from phoenixdb.errors import ProgrammingError from phoenixdb.cursor import DictCursor @@ -83,6 +84,119 @@ class Meta(object): cursor._process_result(result) return cursor.fetchall() + def get_primary_keys(self, catalog=None, schema=None, table=None): + if self._connection._closed: + raise ProgrammingError('The cursor is already closed.') + + state = common_pb2.QueryState() + state.type = common_pb2.StateType.METADATA + state.op = common_pb2.MetaDataOperation.GET_PRIMARY_KEYS + state.has_args = True + state.has_op = True + + catalog_arg = self._moa_string_arg_factory(catalog) + schema_arg = self._moa_string_arg_factory(schema) + table_arg = self._moa_string_arg_factory(table) + state.args.extend([catalog_arg, schema_arg, table_arg]) + + with DictCursor(self._connection) as cursor: + syncResultResponse = cursor.get_sync_results(state) + if not syncResultResponse.more_results: + return [] + + signature = common_pb2.Signature() + signature.columns.append(self._column_meta_data_factory(1, 'TABLE_CAT', 12)) + signature.columns.append(self._column_meta_data_factory(2, 'TABLE_SCHEM', 12)) + signature.columns.append(self._column_meta_data_factory(3, 'TABLE_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(4, 'COLUMN_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(5, 'KEY_SEQ', 5)) + signature.columns.append(self._column_meta_data_factory(6, 'PK_NAME', 12)) + # The following are non-standard Phoenix extensions + # This returns '\x00\x00\x00A' or '\x00\x00\x00D' , but that's consistent with Java + signature.columns.append(self._column_meta_data_factory(7, 'ASC_OR_DESC', 12)) + signature.columns.append(self._column_meta_data_factory(8, 'DATA_TYPE', 5)) + signature.columns.append(self._column_meta_data_factory(9, 'TYPE_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(10, 'COLUMN_SIZE', 5)) + signature.columns.append(self._column_meta_data_factory(11, 'TYPE_ID', 5)) + signature.columns.append(self._column_meta_data_factory(12, 'VIEW_CONSTANT', 12)) + + cursor.fetch(signature) + return cursor.fetchall() + + def get_index_info(self, catalog=None, schema=None, table=None, unique=False, approximate=False): + if self._connection._closed: + raise ProgrammingError('The cursor is already closed.') + + state = common_pb2.QueryState() + state.type = common_pb2.StateType.METADATA + state.op = common_pb2.MetaDataOperation.GET_INDEX_INFO + state.has_args = True + state.has_op = True + + catalog_arg = self._moa_string_arg_factory(catalog) + schema_arg = self._moa_string_arg_factory(schema) + table_arg = self._moa_string_arg_factory(table) + unique_arg = self._moa_bool_arg_factory(unique) + approximate_arg = self._moa_bool_arg_factory(approximate) + + state.args.extend([catalog_arg, schema_arg, table_arg, unique_arg, approximate_arg]) + + with DictCursor(self._connection) as cursor: + syncResultResponse = cursor.get_sync_results(state) + if not syncResultResponse.more_results: + return [] + + signature = common_pb2.Signature() + signature.columns.append(self._column_meta_data_factory(1, 'TABLE_CAT', 12)) + signature.columns.append(self._column_meta_data_factory(2, 'TABLE_SCHEM', 12)) + signature.columns.append(self._column_meta_data_factory(3, 'TABLE_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(4, 'NON_UNIQUE', 16)) + signature.columns.append(self._column_meta_data_factory(5, 'INDEX_QUALIFIER', 12)) + signature.columns.append(self._column_meta_data_factory(6, 'INDEX_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(7, 'TYPE', 5)) + signature.columns.append(self._column_meta_data_factory(8, 'ORDINAL_POSITION', 5)) + signature.columns.append(self._column_meta_data_factory(9, 'COLUMN_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(10, 'ASC_OR_DESC', 12)) + signature.columns.append(self._column_meta_data_factory(11, 'CARDINALITY', 5)) + signature.columns.append(self._column_meta_data_factory(12, 'PAGES', 5)) + signature.columns.append(self._column_meta_data_factory(13, 'FILTER_CONDITION', 12)) + # The following are non-standard Phoenix extensions + signature.columns.append(self._column_meta_data_factory(14, 'DATA_TYPE', 5)) + signature.columns.append(self._column_meta_data_factory(15, 'TYPE_NAME', 12)) + signature.columns.append(self._column_meta_data_factory(16, 'TYPE_ID', 5)) + signature.columns.append(self._column_meta_data_factory(17, 'COLUMN_FAMILY', 12)) + signature.columns.append(self._column_meta_data_factory(18, 'COLUMN_SIZE', 5)) + signature.columns.append(self._column_meta_data_factory(19, 'ARRAY_SIZE', 5)) + + cursor.fetch(signature) + return cursor.fetchall() + + def _column_meta_data_factory(self, ordinal, column_name, jdbc_code): + cmd = common_pb2.ColumnMetaData() + cmd.ordinal = ordinal + cmd.column_name = column_name + cmd.type.id = jdbc_code + cmd.nullable = 2 + return cmd + + def _moa_string_arg_factory(self, arg): + moa = common_pb2.MetaDataOperationArgument() + if arg is None: + moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.NULL + else: + moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.STRING + moa.string_value = arg + return moa + + def _moa_bool_arg_factory(self, arg): + moa = common_pb2.MetaDataOperationArgument() + if arg is None: + moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.NULL + else: + moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.BOOL + moa.bool_value = arg + return moa + def _fix_default(self, rows, catalog=None, schemaPattern=None): '''Workaround for PHOENIX-6003''' if schemaPattern == '': diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py index b402322..8661417 100644 --- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py +++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py @@ -166,19 +166,45 @@ class PhoenixDialect(DefaultDialect): def get_pk_constraint(self, connection, table_name, schema=None, **kw): if schema is None: schema = '' - columns = connection.connect().connection.meta().get_columns( - schemaPattern=schema, tableNamePattern=table_name, *kw) - pk_columns = [col['COLUMN_NAME'] for col in columns if col['KEY_SEQ'] > 0] - return {'constrained_columns': pk_columns} - - def get_indexes(self, conn, table_name, schema=None, **kw): - '''This information does not seem to be exposed via Avatica - TODO: Implement by directly querying SYSTEM tables ? ''' - return [] + raw = connection.connect().connection.meta().get_primary_keys( + schema=schema, table=table_name) + cooked = { + 'constrained_columns': [] + } + if raw: + cooked['name'] = raw[0]['PK_NAME'] + for row in raw: + cooked['constrained_columns'].insert(row['KEY_SEQ'] - 1, row['COLUMN_NAME']) + return cooked + + def get_indexes(self, connection, table_name, schema=None, **kw): + if schema is None: + schema = '' + raw = connection.connect().connection.meta().get_index_info(schema=schema, table=table_name) + # We know that Phoenix returns the rows ordered by INDEX_NAME and ORDINAL_POSITION + cooked = [] + current = None + for row in raw: + if current is None or row['INDEX_NAME'] != current['name']: + current = { + 'name': row['INDEX_NAME'], + 'unique': not row['NON_UNIQUE'] is False, + 'column_names': [], + } + cooked.append(current) + # Phoenix returns the column names in its internal representation here + # Remove the default CF prefix + canonical_name = row['INDEX_NAME'] + if canonical_name.startswith('0:'): + canonical_name = canonical_name[len(':0')] + if canonical_name.startswith(':'): + canonical_name = canonical_name[len(':')] + current['column_names'].append(canonical_name) + return cooked def get_foreign_keys(self, conn, table_name, schema=None, **kw): '''Foreign keys are a foreign concept to Phoenix, - but SqlAlchemy cannot parse the DB schema if it's not implemented ''' + and SqlAlchemy cannot parse the DB schema if it's not implemented ''' return [] def _map_column(self, raw): diff --git a/python-phoenixdb/phoenixdb/tests/test_db.py b/python-phoenixdb/phoenixdb/tests/test_db.py index da12b23..0c04b11 100644 --- a/python-phoenixdb/phoenixdb/tests/test_db.py +++ b/python-phoenixdb/phoenixdb/tests/test_db.py @@ -17,7 +17,7 @@ import unittest import phoenixdb.cursor from phoenixdb.connection import Connection -from phoenixdb.errors import InternalError +from phoenixdb.errors import InternalError, ProgrammingError from phoenixdb.tests import DatabaseTestCase, TEST_DB_URL @@ -172,6 +172,65 @@ class PhoenixDatabaseTest(DatabaseTestCase): {'TABLE_TYPE': 'VIEW'}])) self.assertEqual(meta.get_type_info(), []) + + finally: + cursor.execute('drop table if exists DEFAULT_TABLE') + cursor.execute('drop table if exists A_SCHEMA.A_TABLE') + cursor.execute('drop table if exists B_SCHEMA.B_TABLE') + + def test_meta2(self): + with self.conn.cursor() as cursor: + try: + cursor.execute('drop table if exists DEFAULT_TABLE') + cursor.execute('drop table if exists A_SCHEMA.A_TABLE') + cursor.execute('drop table if exists B_SCHMEA.B_TABLE') + + cursor.execute('''create table DEFAULT_TABLE (ID integer not null, ID2 varchar not null, + V1 integer, V2 varchar, constraint PK PRIMARY KEY (ID DESC, ID2 ASC))''') + cursor.execute('CREATE INDEX GLOBAL_IDX ON DEFAULT_TABLE (V1) INCLUDE (V2)') + cursor.execute('CREATE LOCAL INDEX LOCAL_IDX ON DEFAULT_TABLE (V1)') + cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)') + cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)') + + meta = self.conn.meta() + self.assertTrue(len(meta.get_primary_keys(table='DEFAULT_TABLE')), + [{'ASC_OR_DESC': '\x00\x00\x00D', + 'COLUMN_NAME': 'ID', + 'COLUMN_SIZE': None, + 'DATA_TYPE': 4, + 'KEY_SEQ': 1, + 'PK_NAME': 'PK', + 'TABLE_CAT': None, + 'TABLE_NAME': 'DEFAULT_TABLE', + 'TABLE_SCHEM': None, + 'TYPE_ID': 4, + 'TYPE_NAME': 'INTEGER', + 'VIEW_CONSTANT': None}, + {'ASC_OR_DESC': '\x00\x00\x00A', + 'COLUMN_NAME': 'ID2', + 'COLUMN_SIZE': None, + 'DATA_TYPE': 12, + 'KEY_SEQ': 2, + 'PK_NAME': 'PK', + 'TABLE_CAT': None, + 'TABLE_NAME': 'DEFAULT_TABLE', + 'TABLE_SCHEM': None, + 'TYPE_ID': 12, + 'TYPE_NAME': 'VARCHAR', + 'VIEW_CONSTANT': None}]) + self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='A_TABLE')), 1) + try: + self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='B_TABLE')), 0) + self.assertTrue(False) + except ProgrammingError: + pass + + self.maxDiff = None + + self.assertEqual(meta.get_index_info(table='NON_EXISTENT'), []) + + self.assertTrue(len(meta.get_index_info(table='DEFAULT_TABLE')) > 1) + finally: cursor.execute('drop table if exists DEFAULT_TABLE') cursor.execute('drop table if exists A_SCHEMA.A_TABLE') diff --git a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py index 37ed5a0..c996262 100644 --- a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py +++ b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py @@ -18,6 +18,7 @@ import unittest import sqlalchemy as db from sqlalchemy import text +from sqlalchemy.types import BIGINT, CHAR, VARCHAR from . import TEST_DB_AUTHENTICATION, TEST_DB_AVATICA_PASSWORD, TEST_DB_AVATICA_USER, \ TEST_DB_TRUSTSTORE, TEST_DB_URL @@ -103,8 +104,27 @@ class SQLAlchemyTest(unittest.TestCase): city VARCHAR NOT NULL, population BIGINT CONSTRAINT my_pk PRIMARY KEY (state, city))''')) + connection.execute('CREATE INDEX GLOBAL_IDX ON US_POPULATION (state) INCLUDE (city)') + connection.execute('CREATE LOCAL INDEX LOCAL_IDX ON US_POPULATION (population)') + columns_result = inspector.get_columns('US_POPULATION') - self.assertEqual(len(columns_result), 3) + # The list is not equal to its represenatation + self.assertTrue(str(columns_result), + str([{'name': 'STATE', 'type': CHAR(), 'nullable': True, + 'autoincrement': False, 'comment': '', 'default': None}, + {'name': 'CITY', 'type': VARCHAR(), 'nullable': True, + 'autoincrement': False, 'comment': '', 'default': None}, + {'name': 'POPULATION', 'type': BIGINT(), 'nullable': True, + 'autoincrement': False, 'comment': '', 'default': None}])) + + indexes_result = inspector.get_indexes('US_POPULATION') + self.assertTrue(indexes_result, + [{'name': 'GLOBAL_IDX', 'unique': False, 'column_names': ['STATE', 'CITY']}, + {'name': 'LOCAL_IDX', 'unique': False, 'column_names': ['_INDEX_ID', 'POPULATION', 'STATE', 'CITY']}]) + + pk_result = inspector.get_pk_constraint('US_POPULATION') + self.assertTrue(pk_result, {'constrained_columns': ['STATE', 'CITY'], 'name': 'MY_PK'}) + finally: connection.execute('drop table if exists us_population')