This is an automated email from the ASF dual-hosted git repository.

jingyimei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new 79bb4be  Fix deparsing of a table's DISTRIBUTED BY clause for GPDB 6.
79bb4be is described below

commit 79bb4be3cd6b43fa7b090dd508c27b71e632ad27
Author: Heikki Linnakangas <heikki.linnakan...@iki.fi>
AuthorDate: Fri Feb 1 18:16:38 2019 +0200

    Fix deparsing of a table's DISTRIBUTED BY clause for GPDB 6.
    
    JIRA: MADLIB-1298
    
    GPDB 6 makes changes to the gp_distribution_policy catalog table:
    'attrnums' column is renamed to 'distkeys', and its datatype changes from
    smallint[] to int2vector. It also adds a new column, distopclasses, which
    needs to be taken into account when looking at a table's distribution key,
    along with the attribute numbers. See GPDB commits 69ec6926c2 and
    242783ae9f for details.
    
    Fortunately, GPDB 6 also adds a helper function,
    pg_catalog.pg_get_distributed_by(), to do the deparsing for us. Use that
    function when running on GPDB 6.
    
    Closes #349
---
 .../modules/utilities/create_indicators.py_in      | 13 ++---
 .../postgres/modules/utilities/utilities.py_in     | 63 ++++++++++++++--------
 2 files changed, 46 insertions(+), 30 deletions(-)

diff --git a/src/ports/postgres/modules/utilities/create_indicators.py_in 
b/src/ports/postgres/modules/utilities/create_indicators.py_in
index fae4fe1..c232e6f 100644
--- a/src/ports/postgres/modules/utilities/create_indicators.py_in
+++ b/src/ports/postgres/modules/utilities/create_indicators.py_in
@@ -29,7 +29,7 @@ database constructs.
 import plpy
 from control import MinWarning
 from utilities import _assert
-from utilities import get_distribution_policy
+from utilities import get_distributed_by
 from utilities import split_quoted_delimited_str
 from utilities import strip_end_quotes
 from validate_args import table_exists
@@ -95,15 +95,10 @@ def create_indicator_variables(schema_madlib, source_table, 
out_table,
 
         if not is_postgresql:
             if distributed_by:
-                dist_str = distributed_by
+                dist_str = 'distributed by (' + distributed_by + ')'
             else:
-                dist_str = ','.join(['"%s"' % i
-                                     for i in 
get_distribution_policy(source_table)
-                                     if i is not None])
-            if dist_str:
-                sql_list.append("distributed by (" + dist_str + ")")
-            else:
-                sql_list.append("distributed randomly")
+                dist_str = get_distributed_by(source_table)
+               sql_list.append(dist_str)
         plpy.execute(''.join(sql_list))
         return None
 # ---------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in 
b/src/ports/postgres/modules/utilities/utilities.py_in
index 89907b3..1b0069f 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -90,8 +90,8 @@ def warn(condition, msg):
 # 
------------------------------------------------------------------------------
 
 
-def get_distribution_policy(source_table):
-    """ Return a list of columns that define the distribution policy of 
source_table
+def get_distributed_by(source_table):
+    """ Return a "distributed by (...)" clause that defines distribution 
policy of source_table
     Args:
         @param source_table
 
@@ -100,25 +100,46 @@ def get_distribution_policy(source_table):
     """
     _, table_name = _get_table_schema_names(source_table)
     schema_name = get_first_schema(source_table)
-    dist_attr = plpy.execute("""
-    SELECT array_agg(pga.attname) as dist_attr
-    FROM (
-        SELECT gdp.localoid,
-                 CASE
-                     WHEN ( ARRAY_UPPER(gdp.attrnums, 1) > 0 ) THEN
-                        UNNEST(gdp.attrnums)
-                     ELSE NULL
-                 END AS attnum
-            FROM gp_distribution_policy gdp
-        ) AS distkey
-        INNER JOIN pg_class AS pgc
-        ON distkey.localoid = pgc.oid AND pgc.relname = '{table_name}'
-        INNER JOIN pg_namespace pgn
-        ON pgc.relnamespace = pgn.oid AND pgn.nspname = '{schema_name}'
-        LEFT OUTER JOIN pg_attribute pga
-        ON distkey.attnum = pga.attnum AND distkey.localoid = pga.attrelid
-    """.format(table_name=table_name, schema_name=schema_name))[0]["dist_attr"]
-    return dist_attr
+
+    # GPDB 6 has pg_get_table_distributedby(<oid>) function to get the
+    # DISTRIBUTED BY clause of a table. In older version, we have to
+    # dig the column names based on gp_distribution_policy catalog.
+    version_wrapper = __mad_version()
+    if version_wrapper.is_gp_version_less_than("6.0"):
+        dist_attr = plpy.execute("""
+        SELECT array_agg(pga.attname) as dist_attr
+        FROM (
+            SELECT gdp.localoid,
+                     CASE
+                         WHEN ( ARRAY_UPPER(gdp.attrnums, 1) > 0 ) THEN
+                            UNNEST(gdp.attrnums)
+                         ELSE NULL
+                     END AS attnum
+                FROM gp_distribution_policy gdp
+            ) AS distkey
+            INNER JOIN pg_class AS pgc
+            ON distkey.localoid = pgc.oid AND pgc.relname = '{table_name}'
+            INNER JOIN pg_namespace pgn
+            ON pgc.relnamespace = pgn.oid AND pgn.nspname = '{schema_name}'
+            LEFT OUTER JOIN pg_attribute pga
+            ON distkey.attnum = pga.attnum AND distkey.localoid = pga.attrelid
+        """.format(table_name=table_name, 
schema_name=schema_name))[0]["dist_attr"]
+        if len(dist_attr) > 0:
+            dist_str = 'distributed by (' + ','.join(['"%s"' % i
+                                                      for i in dist_attr
+                                                      if i is not None]) + ')'
+        else:
+            dist_str = 'distributed randomly'
+    else:
+        dist_str = plpy.execute("""
+        SELECT pg_catalog.pg_get_table_distributedby(pgc.oid) as distributedby
+        FROM pg_class AS pgc
+        INNER JOIN pg_namespace pgn ON pgc.relnamespace = pgn.oid
+        WHERE pgc.relname = '{table_name}' AND pgn.nspname = '{schema_name}'
+        """.format(table_name=table_name, 
schema_name=schema_name))[0]["distributedby"]
+
+    return dist_str
+
 # 
------------------------------------------------------------------------------
 
 

Reply via email to