This is an automated email from the ASF dual-hosted git repository. jingyimei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
The following commit(s) were added to refs/heads/master by this push: new 79bb4be Fix deparsing of a table's DISTRIBUTED BY clause for GPDB 6. 79bb4be is described below commit 79bb4be3cd6b43fa7b090dd508c27b71e632ad27 Author: Heikki Linnakangas <heikki.linnakan...@iki.fi> AuthorDate: Fri Feb 1 18:16:38 2019 +0200 Fix deparsing of a table's DISTRIBUTED BY clause for GPDB 6. JIRA: MADLIB-1298 GPDB 6 makes changes to the gp_distribution_policy catalog table: 'attrnums' column is renamed to 'distkeys', and its datatype changes from smallint[] to int2vector. It also adds a new column, distopclasses, which needs to be taken into account when looking at a table's distribution key, along with the attribute numbers. See GPDB commits 69ec6926c2 and 242783ae9f for details. Fortunately, GPDB 6 also adds a helper function, pg_catalog.pg_get_distributed_by(), to do the deparsing for us. Use that function when running on GPDB 6. Closes #349 --- .../modules/utilities/create_indicators.py_in | 13 ++--- .../postgres/modules/utilities/utilities.py_in | 63 ++++++++++++++-------- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/src/ports/postgres/modules/utilities/create_indicators.py_in b/src/ports/postgres/modules/utilities/create_indicators.py_in index fae4fe1..c232e6f 100644 --- a/src/ports/postgres/modules/utilities/create_indicators.py_in +++ b/src/ports/postgres/modules/utilities/create_indicators.py_in @@ -29,7 +29,7 @@ database constructs. import plpy from control import MinWarning from utilities import _assert -from utilities import get_distribution_policy +from utilities import get_distributed_by from utilities import split_quoted_delimited_str from utilities import strip_end_quotes from validate_args import table_exists @@ -95,15 +95,10 @@ def create_indicator_variables(schema_madlib, source_table, out_table, if not is_postgresql: if distributed_by: - dist_str = distributed_by + dist_str = 'distributed by (' + distributed_by + ')' else: - dist_str = ','.join(['"%s"' % i - for i in get_distribution_policy(source_table) - if i is not None]) - if dist_str: - sql_list.append("distributed by (" + dist_str + ")") - else: - sql_list.append("distributed randomly") + dist_str = get_distributed_by(source_table) + sql_list.append(dist_str) plpy.execute(''.join(sql_list)) return None # --------------------------------------------------------------- diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in index 89907b3..1b0069f 100644 --- a/src/ports/postgres/modules/utilities/utilities.py_in +++ b/src/ports/postgres/modules/utilities/utilities.py_in @@ -90,8 +90,8 @@ def warn(condition, msg): # ------------------------------------------------------------------------------ -def get_distribution_policy(source_table): - """ Return a list of columns that define the distribution policy of source_table +def get_distributed_by(source_table): + """ Return a "distributed by (...)" clause that defines distribution policy of source_table Args: @param source_table @@ -100,25 +100,46 @@ def get_distribution_policy(source_table): """ _, table_name = _get_table_schema_names(source_table) schema_name = get_first_schema(source_table) - dist_attr = plpy.execute(""" - SELECT array_agg(pga.attname) as dist_attr - FROM ( - SELECT gdp.localoid, - CASE - WHEN ( ARRAY_UPPER(gdp.attrnums, 1) > 0 ) THEN - UNNEST(gdp.attrnums) - ELSE NULL - END AS attnum - FROM gp_distribution_policy gdp - ) AS distkey - INNER JOIN pg_class AS pgc - ON distkey.localoid = pgc.oid AND pgc.relname = '{table_name}' - INNER JOIN pg_namespace pgn - ON pgc.relnamespace = pgn.oid AND pgn.nspname = '{schema_name}' - LEFT OUTER JOIN pg_attribute pga - ON distkey.attnum = pga.attnum AND distkey.localoid = pga.attrelid - """.format(table_name=table_name, schema_name=schema_name))[0]["dist_attr"] - return dist_attr + + # GPDB 6 has pg_get_table_distributedby(<oid>) function to get the + # DISTRIBUTED BY clause of a table. In older version, we have to + # dig the column names based on gp_distribution_policy catalog. + version_wrapper = __mad_version() + if version_wrapper.is_gp_version_less_than("6.0"): + dist_attr = plpy.execute(""" + SELECT array_agg(pga.attname) as dist_attr + FROM ( + SELECT gdp.localoid, + CASE + WHEN ( ARRAY_UPPER(gdp.attrnums, 1) > 0 ) THEN + UNNEST(gdp.attrnums) + ELSE NULL + END AS attnum + FROM gp_distribution_policy gdp + ) AS distkey + INNER JOIN pg_class AS pgc + ON distkey.localoid = pgc.oid AND pgc.relname = '{table_name}' + INNER JOIN pg_namespace pgn + ON pgc.relnamespace = pgn.oid AND pgn.nspname = '{schema_name}' + LEFT OUTER JOIN pg_attribute pga + ON distkey.attnum = pga.attnum AND distkey.localoid = pga.attrelid + """.format(table_name=table_name, schema_name=schema_name))[0]["dist_attr"] + if len(dist_attr) > 0: + dist_str = 'distributed by (' + ','.join(['"%s"' % i + for i in dist_attr + if i is not None]) + ')' + else: + dist_str = 'distributed randomly' + else: + dist_str = plpy.execute(""" + SELECT pg_catalog.pg_get_table_distributedby(pgc.oid) as distributedby + FROM pg_class AS pgc + INNER JOIN pg_namespace pgn ON pgc.relnamespace = pgn.oid + WHERE pgc.relname = '{table_name}' AND pgn.nspname = '{schema_name}' + """.format(table_name=table_name, schema_name=schema_name))[0]["distributedby"] + + return dist_str + # ------------------------------------------------------------------------------