Repository: incubator-madlib Updated Branches: refs/heads/master 22fd88e42 -> 4e8616b7a
Control: Update 'optimizer' GUC only if editable JIRA: MADLIB-1109 This is applicable only for the Greenplum platforms: We disable/enable ORCA using the 'optimizer' GUC in some functions for performance reasons. Greenplum has another GUC 'optimizer_control' which allows the user to disable updates to the 'optimizer' GUC. Updating 'optimizer' when 'optimizer_control = off' leads to an ugly error. This commit adds a check for the value of 'optimizer_control' and updates 'optimizer' only if 'optimizer_control = on'. If the 'optimizer_control' GUC is not available then 'optimizer' is assumed to be accessible. Other changes: - Removed disabling of optimizer in K-means and composite controller since platform issue has been fixed. - EnableOptimizer and EnableHashagg have been aptly renamed to OptimizerControl and HashaggControl. - Some unused debug time() function calls have been commented out. Closes #157 Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/4e8616b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/4e8616b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/4e8616b7 Branch: refs/heads/master Commit: 4e8616b7a9c0a21326b906ff534d341fab8a5fa4 Parents: 22fd88e Author: Rahul Iyer <ri...@apache.org> Authored: Tue Aug 8 09:56:13 2017 -0700 Committer: Rahul Iyer <ri...@apache.org> Committed: Tue Aug 8 09:57:57 2017 -0700 ---------------------------------------------------------------------- src/ports/postgres/modules/kmeans/kmeans.sql_in | 60 ++++++----------- src/ports/postgres/modules/lda/lda.py_in | 52 +++++++-------- .../recursive_partitioning/decision_tree.py_in | 13 ++-- .../recursive_partitioning/random_forest.py_in | 12 ++-- .../postgres/modules/utilities/control.py_in | 61 ++++++++++++------ .../modules/utilities/control_composite.py_in | 68 +++++++++----------- 6 files changed, 130 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/4e8616b7/src/ports/postgres/modules/kmeans/kmeans.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/kmeans/kmeans.sql_in b/src/ports/postgres/modules/kmeans/kmeans.sql_in index b3cdd55..f11478e 100644 --- a/src/ports/postgres/modules/kmeans/kmeans.sql_in +++ b/src/ports/postgres/modules/kmeans/kmeans.sql_in @@ -218,7 +218,7 @@ closest_column( m, x ) @anchor examples @examp -Note: Your results may not be exactly the same as below due to the nature of the +Note: Your results may not be exactly the same as below due to the nature of the k-means algorithm. -# Prepare some input data: @@ -334,18 +334,18 @@ Result: -# Run the same example as above, but using array input. Create the input table: <pre class="example"> DROP TABLE IF EXISTS km_arrayin CASCADE; -CREATE TABLE km_arrayin(pid int, - p1 float, - p2 float, +CREATE TABLE km_arrayin(pid int, + p1 float, + p2 float, p3 float, - p4 float, - p5 float, + p4 float, + p5 float, p6 float, - p7 float, - p8 float, + p7 float, + p8 float, p9 float, - p10 float, - p11 float, + p10 float, + p11 float, p12 float, p13 float); INSERT INTO km_arrayin VALUES @@ -365,24 +365,24 @@ Now find the cluster assignment for each point: DROP TABLE IF EXISTS km_result; -- Run kmeans algorithm CREATE TABLE km_result AS -SELECT * FROM madlib.kmeans_random('km_arrayin', - 'ARRAY[p1, p2, p3, p4, p5, p6, - p7, p8, p9, p10, p11, p12, p13]', +SELECT * FROM madlib.kmeans_random('km_arrayin', + 'ARRAY[p1, p2, p3, p4, p5, p6, + p7, p8, p9, p10, p11, p12, p13]', 2, 'madlib.squared_dist_norm2', - 'madlib.avg', - 20, + 'madlib.avg', + 20, 0.001); -- Get point assignment -SELECT data.*, (madlib.closest_column(centroids, - ARRAY[p1, p2, p3, p4, p5, p6, - p7, p8, p9, p10, p11, p12, p13])).column_id as cluster_id +SELECT data.*, (madlib.closest_column(centroids, + ARRAY[p1, p2, p3, p4, p5, p6, + p7, p8, p9, p10, p11, p12, p13])).column_id as cluster_id FROM km_arrayin as data, km_result ORDER BY data.pid; -</pre> +</pre> This produces the result in column format: <pre class="result"> - pid | p1 | p2 | p3 | p4 | p5 | p6 | p7 | p8 | p9 | p10 | p11 | p12 | p13 | cluster_id + pid | p1 | p2 | p3 | p4 | p5 | p6 | p7 | p8 | p9 | p10 | p11 | p12 | p13 | cluster_id -----+-------+------+------+------+-----+------+------+------+------+--------+------+------+------+------------ 1 | 14.23 | 1.71 | 2.43 | 15.6 | 127 | 2.8 | 3.06 | 0.28 | 2.29 | 5.64 | 1.04 | 3.92 | 1065 | 0 2 | 13.2 | 1.78 | 2.14 | 11.2 | 1 | 2.65 | 2.76 | 0.26 | 1.28 | 4.38 | 1.05 | 3.49 | 1050 | 0 @@ -740,16 +740,11 @@ DECLARE centroids FLOAT8[]; data_size INTEGER; init_size INTEGER; - old_optimizer TEXT; BEGIN oldClientMinMessages := (SELECT setting FROM pg_settings WHERE name = 'client_min_messages'); EXECUTE 'SET client_min_messages TO warning'; - m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', ` - EXECUTE $sql$ SHOW optimizer $sql$ into old_optimizer; - EXECUTE $sql$ SET optimizer=off $sql$;', `') -- disable ORCA before MPP-23166 is fixed - PERFORM MADLIB_SCHEMA.__kmeans_validate_src(rel_source); IF (array_upper(initial_centroids,1) IS NULL) THEN @@ -848,8 +843,6 @@ BEGIN IF NOT (theResult IS NULL) THEN theResult.num_iterations = theIteration; END IF; - m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', ` - EXECUTE $sql$ SET optimizer=$sql$ || old_optimizer;', `') EXECUTE 'SET client_min_messages TO ' || oldClientMinMessages; RETURN theResult; END; @@ -962,14 +955,10 @@ DECLARE num_points INTEGER; num_centroids INTEGER; num_array_dim INTEGER; - old_optimizer TEXT; BEGIN oldClientMinMessages := (SELECT setting FROM pg_settings WHERE name = 'client_min_messages'); EXECUTE 'SET client_min_messages TO warning'; - m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', ` - EXECUTE $sql$ SHOW optimizer $sql$ into old_optimizer; - EXECUTE $sql$ SET optimizer=off $sql$;', `') -- disable ORCA before MPP-23166 is fixed PERFORM MADLIB_SCHEMA.__kmeans_validate_src(rel_source); PERFORM MADLIB_SCHEMA.__seeding_validate_args( @@ -1045,8 +1034,6 @@ BEGIN WHERE _iteration = $sql$ || theIteration || $sql$ $sql$ INTO theResult; - m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', ` - EXECUTE $sql$ SET optimizer=$sql$ || old_optimizer;', `') EXECUTE 'SET client_min_messages TO ' || oldClientMinMessages; IF (seeding_sample_ratio < 1.0) THEN @@ -1658,16 +1645,11 @@ DECLARE proc_fn_dist REGPROCEDURE; rel_filtered VARCHAR; ans DOUBLE PRECISION; - old_optimizer TEXT; BEGIN IF (array_upper(centroids,1) IS NULL) THEN RAISE EXCEPTION 'Kmeans error: No valid centroids given.'; END IF; - m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', ` - EXECUTE $sql$ SHOW optimizer $sql$ into old_optimizer; - EXECUTE $sql$ SET optimizer=off $sql$;', `') -- disable ORCA before MPP-23166 is fixed - PERFORM MADLIB_SCHEMA.__kmeans_validate_src(rel_source); -- Validate the expr_point input. Since we don't need a view at this @@ -1707,8 +1689,6 @@ BEGIN $sql$, centroids, proc_fn_dist, fn_dist); - m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', ` - EXECUTE $sql$ SET optimizer=$sql$ || old_optimizer;', `') RETURN ans; END; $$ http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/4e8616b7/src/ports/postgres/modules/lda/lda.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/lda/lda.py_in b/src/ports/postgres/modules/lda/lda.py_in index d6be0b1..4011141 100644 --- a/src/ports/postgres/modules/lda/lda.py_in +++ b/src/ports/postgres/modules/lda/lda.py_in @@ -10,11 +10,11 @@ LDA: Driver and auxiliary functions """ import plpy -import time +# import time # use mad_vec to process arrays passed as strings in GPDB < 4.1 and PG < 9.0 -from utilities.control import EnableOptimizer -from utilities.control import EnableHashagg +from utilities.control import OptimizerControl +from utilities.control import HashaggControl from utilities.utilities import __mad_version, _assert, warn from utilities.validate_args import output_tbl_valid from utilities.validate_args import input_tbl_valid @@ -102,7 +102,7 @@ class LDATrainer: """.format(output_data_table=self.output_data_table)) def init_random(self): - stime = time.time() + # stime = time.time() # plpy.notice('initializing topics randomly ...') plpy.execute('TRUNCATE TABLE ' + self.work_table_0) @@ -117,11 +117,11 @@ class LDATrainer: topic_num=self.topic_num, data_table=self.data_table)) - etime = time.time() + # etime = time.time() # plpy.notice('\t\ttime elapsed: %.2f seconds' % (etime - stime)) def gen_output_data_table(self): - stime = time.time() + # stime = time.time() # plpy.notice('\t\tgenerating output data table ...') work_table_final = self.work_table_1 @@ -139,11 +139,11 @@ class LDATrainer: """.format(output_data_table=self.output_data_table, topic_num=self.topic_num, work_table_final=work_table_final)) - etime = time.time() + # etime = time.time() # plpy.notice('\t\t\ttime elapsed: %.2f seconds' % (etime - stime)) def iteration(self, it): - stime = time.time() + # stime = time.time() work_table_in = self.work_table_0 if it % 2 == 0: @@ -158,7 +158,7 @@ class LDATrainer: plpy.execute('TRUNCATE TABLE ' + self.model_table) if version_wrapper.is_gp43() or version_wrapper.is_hawq(): - with EnableOptimizer(True): + with OptimizerControl(True): plpy.execute(""" INSERT INTO {model_table} SELECT @@ -183,7 +183,7 @@ class LDATrainer: work_table_in=work_table_in)) else: # work around insertion memory error (MPP-25561) - # by taking the model in Python temporarily + # by copying the model to Python temporarily model = plpy.execute(""" SELECT {schema_madlib}.__lda_count_topic_agg( @@ -215,7 +215,7 @@ class LDATrainer: # For GPDB 4.3 and higher we disable the optimzer (ORCA) for the query # planner since currently ORCA doesn't support InitPlan. This would have # to be fixed when ORCA is the only available query planner. - with EnableOptimizer(False): + with OptimizerControl(False): # plpy.notice('\t\tsampling ...') plpy.execute('TRUNCATE TABLE ' + work_table_out) query = """ @@ -238,23 +238,23 @@ class LDATrainer: work_table_in=work_table_in) plpy.execute(query) - etime = time.time() + # etime = time.time() # plpy.notice('\t\ttime elapsed: %.2f seconds' % (etime - stime)) def run(self): - stime = time.time() + # stime = time.time() # plpy.notice('start training process ...') self.init_random() - sstime = time.time() + # sstime = time.time() for it in range(1, self.iter_num + 1): self.iteration(it) - eetime = time.time() + # eetime = time.time() # plpy.notice('\t\titeration done, time elapsed: %.2f seconds' % (eetime - sstime)) self.gen_output_data_table() - etime = time.time() + # etime = time.time() # plpy.notice('finished, time elapsed: %.2f seconds' % (etime - stime)) # ------------------------------------------------------------------------------ @@ -330,7 +330,7 @@ class LDAPredictor: """.format(doc_topic=self.doc_topic)) def init_random(self): - stime = time.time() + # stime = time.time() # plpy.notice('initializing topics randomly ...') plpy.execute('TRUNCATE TABLE ' + self.work_table_0) @@ -345,7 +345,7 @@ class LDAPredictor: topic_num=self.topic_num, data_table=self.test_table)) - etime = time.time() + # etime = time.time() # plpy.notice('\t\ttime elapsed: %.2f seconds' % (etime - stime)) def gen_output_table(self): @@ -361,13 +361,13 @@ class LDAPredictor: work_table_out=self.work_table_1)) def infer(self): - stime = time.time() + # stime = time.time() # plpy.notice('infering ...') # For GPDB 4.3 and higher we disable the optimzer (ORCA) for the query # planner since currently ORCA doesn't support InitPlan. This would have # to be fixed when ORCA is the only available query planner. - with EnableOptimizer(False): + with OptimizerControl(False): query = """ INSERT INTO {work_table_out} SELECT @@ -391,18 +391,18 @@ class LDAPredictor: iter_num=self.iter_num) plpy.execute(query) - etime = time.time() + # etime = time.time() # plpy.notice('\t\ttime elapsed: %.2f seconds' % (etime - stime)) def run(self): - stime = time.time() + # stime = time.time() # plpy.notice('start prediction process ...') self.init_random() self.infer() self.gen_output_table() - etime = time.time() + # etime = time.time() # plpy.notice('finished, time elapsed: %.2f seconds' % (etime - stime)) # ------------------------------------------------------------------------------ @@ -707,7 +707,7 @@ def get_perplexity(schema_madlib, model_table, output_data_table): # For GPDB 4.3 and higher we disable the optimzer (ORCA) for the query # planner since currently ORCA doesn't support InitPlan. This would have # to be fixed when ORCA is the only available query planner. - with EnableOptimizer(False): + with OptimizerControl(False): query = """ SELECT exp(-part_perp/total_word) AS perp FROM @@ -894,8 +894,8 @@ def _convert_data_table(schema_madlib, data_table): DISTRIBUTED BY (docid)') """.format(convt_table=convt_table)) - with EnableOptimizer(False): - with EnableHashagg(False): + with OptimizerControl(False): + with HashaggControl(False): plpy.execute(""" INSERT INTO {convt_table} SELECT http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/4e8616b7/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in b/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in index d82936a..35fc3f9 100644 --- a/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in +++ b/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in @@ -16,8 +16,8 @@ from collections import Iterable from validation.cross_validation import cross_validation_grouping_w_params from utilities.control import MinWarning -from utilities.control import EnableOptimizer -from utilities.control import EnableHashagg +from utilities.control import OptimizerControl +from utilities.control import HashaggControl from utilities.validate_args import get_cols from utilities.validate_args import get_cols_and_types from utilities.validate_args import _get_table_schema_names @@ -30,7 +30,6 @@ from utilities.validate_args import unquote_ident from utilities.utilities import _assert from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string -from utilities.utilities import _array_to_string from utilities.utilities import add_postfix from utilities.utilities import split_quoted_delimited_str from utilities.utilities import py_list_to_sql_string @@ -437,10 +436,10 @@ def _get_tree_states(schema_madlib, is_classification, split_criterion, else: grouping_array_str = get_grouping_array_str(training_table_name, grouping_cols) - with EnableOptimizer(False): + with OptimizerControl(False): # we disable optimizer (ORCA) for platforms that use it # since ORCA doesn't provide an easy way to disable hashagg - with EnableHashagg(False): + with HashaggControl(False): # we disable hashagg since large number of groups could # result in excessive memory usage. plpy.notice("Analyzing data to compute split boundaries for variables") @@ -1841,10 +1840,10 @@ def tree_predict(schema_madlib, model, source, output, pred_type='response', """ sql = sql.format(**locals()) with MinWarning('warning'): - with EnableOptimizer(False): + with OptimizerControl(False): # we disable optimizer (ORCA) for platforms that use it # since ORCA doesn't provide an easy way to disable hashagg - with EnableHashagg(False): + with HashaggControl(False): # we disable hashagg since large number of groups could # result in excessive memory usage. plpy.execute(sql) http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/4e8616b7/src/ports/postgres/modules/recursive_partitioning/random_forest.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/recursive_partitioning/random_forest.py_in b/src/ports/postgres/modules/recursive_partitioning/random_forest.py_in index bc4bd97..a28f33f 100644 --- a/src/ports/postgres/modules/recursive_partitioning/random_forest.py_in +++ b/src/ports/postgres/modules/recursive_partitioning/random_forest.py_in @@ -11,8 +11,8 @@ import plpy from math import sqrt, ceil from utilities.control import MinWarning -from utilities.control import EnableOptimizer -from utilities.control import EnableHashagg +from utilities.control import OptimizerControl +from utilities.control import HashaggControl from utilities.utilities import _assert from utilities.utilities import unique_string from utilities.utilities import add_postfix @@ -266,10 +266,10 @@ def forest_train( msg_level = "'notice'" if verbose else "'warning'" with MinWarning(msg_level): - with EnableOptimizer(False): + with OptimizerControl(False): # we disable optimizer (ORCA) for platforms that use it # since ORCA doesn't provide an easy way to disable hashagg - with EnableHashagg(False): + with HashaggControl(False): # we disable hashagg since large number of groups could # result in excessive memory usage. # set default values @@ -809,10 +809,10 @@ def forest_predict(schema_madlib, model, source, output, pred_type='response', """.format(**locals()) with MinWarning('warning'): - with EnableOptimizer(False): + with OptimizerControl(False): # we disable optimizer (ORCA) for platforms that use it # since ORCA doesn't provide an easy way to disable hashagg - with EnableHashagg(False): + with HashaggControl(False): # we disable hashagg since large number of groups could # result in excessive memory usage. plpy.notice("sql_prediction:\n"+sql_prediction) http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/4e8616b7/src/ports/postgres/modules/utilities/control.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/utilities/control.py_in b/src/ports/postgres/modules/utilities/control.py_in index b031701..bd37bac 100644 --- a/src/ports/postgres/modules/utilities/control.py_in +++ b/src/ports/postgres/modules/utilities/control.py_in @@ -11,6 +11,7 @@ m4_changequote(`<!', `!>') @brief driver functions shared by modules """ +from distutils.util import strtobool import plpy from utilities import __mad_version @@ -24,59 +25,77 @@ HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <! UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>) -class EnableOptimizer(object): +class OptimizerControl(object): """ @brief: A wrapper that enables/disables the optimizer and then sets it back to the original value on exit + + This context manager accesses two GUCs: + optimizer: GUC to control the optimizer used in Greenplum and HAWQ platforms + + optimizer_control: Used to check if 'optimizer' GUC can be updated by MADlib. + This is assumed to be True if the GUC is not available """ - def __init__(self, to_enable=True): - self.to_enable = to_enable - self.optimizer_enabled = False - # we depend on the fact that all GPDB/HAWQ versions that have the + def __init__(self, enable=True, error_on_fail=False): + self.to_enable = enable + self.error_on_fail = error_on_fail + + # use the fact that all GPDB/HAWQ versions that have the # optimizer also define function properties self.guc_exists = True if HAS_FUNCTION_PROPERTIES else False def __enter__(self): - # we depend on the fact that all GPDB/HAWQ versions that have the ORCA - # optimizer also define function properties if self.guc_exists: - optimizer = plpy.execute("show optimizer")[0]["optimizer"] - self.optimizer_enabled = True if optimizer == 'on' else False - plpy.execute("set optimizer={0}".format(('off', 'on')[self.to_enable])) + # check if allowed to change the GUC + try: + self.optimizer_control = bool(strtobool( + plpy.execute("show optimizer_control")[0]["optimizer_control"])) + except plpy.SPIError: + self.optimizer_control = True + + if self.optimizer_control: + self.optimizer_enabled = bool(strtobool( + plpy.execute("show optimizer")[0]["optimizer"])) + new_optimizer = 'on' if self.to_enable else 'off' + plpy.execute("set optimizer={0}".format(new_optimizer)) + else: + if self.error_on_fail: + plpy.error("Unable to change 'optimizer' value. " + "Set 'optimizer_control = on' to proceed.") def __exit__(self, *args): if args and args[0]: - # an exception was raised in code. We return False so that any + # an exception was raised in code, return False so that any # exception is re-raised after exit. The transaction will not - # commit leading to reset of client_min_messages. + # commit leading to reset of any change to parameter. return False else: - if self.guc_exists: + if self.guc_exists and self.optimizer_control: plpy.execute("set optimizer={0}". format(('off', 'on')[self.optimizer_enabled])) -class EnableHashagg(object): +class HashaggControl(object): """ @brief: A wrapper that enables/disables the hashagg and then sets it back to the original value on exit """ - def __init__(self, to_enable=True): - self.to_enable = to_enable + def __init__(self, enable=True): + self.to_enable = enable self.hashagg_enabled = False self.guc_exists = True def __enter__(self): try: enable_hashagg = plpy.execute("show enable_hashagg")[0]["enable_hashagg"] - self.hashagg_enabled = True if enable_hashagg == 'on' else False + self.hashagg_enabled = bool(strtobool(enable_hashagg)) plpy.execute("set enable_hashagg={0}". format(('off', 'on')[self.to_enable])) - except: + except plpy.SPIError: self.guc_exists = False finally: return self @@ -85,7 +104,7 @@ class EnableHashagg(object): if args and args[0]: # an exception was raised in code. We return False so that any # exception is re-raised after exit. The transaction will not - # commit leading to reset of client_min_messages. + # commit leading to reset of parameter value. return False else: if self.guc_exists: @@ -218,7 +237,7 @@ class IterationController: # For GPDB 4.3 we disable the optimizer (ORCA) for the query planner # since currently ORCA has a bug for left outer joins (MPP-21868). # This should be removed when the issue is fixed in ORCA. - with EnableOptimizer(False): + with OptimizerControl(False): if STATE_IN_MEM: eval_plan = plpy.prepare(""" SELECT ({expression}) AS expression @@ -346,7 +365,7 @@ class IterationController2D(IterationController): # We disable the optimizer (ORCA) for the query planning # since ORCA has a bug for left outer joins (MPP-21868). # This should be removed when the issue is fixed in ORCA. - with EnableOptimizer(False): + with OptimizerControl(False): if STATE_IN_MEM: eval_plan = plpy.prepare(""" SELECT ({expression}) AS expression http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/4e8616b7/src/ports/postgres/modules/utilities/control_composite.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/utilities/control_composite.py_in b/src/ports/postgres/modules/utilities/control_composite.py_in index ad0e7ef..1e5b836 100644 --- a/src/ports/postgres/modules/utilities/control_composite.py_in +++ b/src/ports/postgres/modules/utilities/control_composite.py_in @@ -17,7 +17,7 @@ from utilities import __mad_version version_wrapper = __mad_version() from utilities import unique_string _unique_string = unique_string -from control import MinWarning, EnableOptimizer +from control import MinWarning STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>) @@ -115,39 +115,35 @@ class IterationControllerComposite: @return None if \c expression evaluates to NULL, otherwise the value of \c expression """ - # We disable the optimzer (ORCA) for the query planning - # since ORCA has a bug for left outer joins (MPP-21868). - # This should be removed when the issue is fixed in ORCA. - with EnableOptimizer(False): - if STATE_IN_MEM: - cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4, $5) AS {schema_madlib}.kmeans_state)".format(**self.kwargs) - cast_type = ["DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION[]", - "DOUBLE PRECISION", "DOUBLE PRECISION"] - cast_para = [None if self.new_state is None else self.new_state[i] - for i in ('centroids', 'old_centroid_ids', 'cluster_variance', - 'objective_fn', 'frac_reassigned')] - eval_plan = plpy.prepare(""" - SELECT ({expression}) AS expression - FROM {{rel_args}} AS _args - LEFT OUTER JOIN ( - SELECT {{schema_madlib}}.array_to_2d($1) AS _state - ) AS _state ON True - """.format(expression=expression). - format(iteration=self.iteration, - curr_state=cast_str, **self.kwargs), cast_type) - resultObject = plpy.execute(eval_plan, cast_para) - else: - resultObject = self.runSQL(""" - SELECT ({expression}) AS expression - FROM {{rel_args}} AS _args - LEFT OUTER JOIN ( - SELECT * - FROM {{rel_state}} AS _state - WHERE _state._iteration = {{iteration}} - ) AS _state ON True - """.format(expression=expression). - format(iteration=self.iteration, - **self.kwargs)) + if STATE_IN_MEM: + cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4, $5) AS {schema_madlib}.kmeans_state)".format(**self.kwargs) + cast_type = ["DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION[]", + "DOUBLE PRECISION", "DOUBLE PRECISION"] + cast_para = [None if self.new_state is None else self.new_state[i] + for i in ('centroids', 'old_centroid_ids', 'cluster_variance', + 'objective_fn', 'frac_reassigned')] + eval_plan = plpy.prepare(""" + SELECT ({expression}) AS expression + FROM {{rel_args}} AS _args + LEFT OUTER JOIN ( + SELECT {{schema_madlib}}.array_to_2d($1) AS _state + ) AS _state ON True + """.format(expression=expression). + format(iteration=self.iteration, + curr_state=cast_str, **self.kwargs), cast_type) + resultObject = plpy.execute(eval_plan, cast_para) + else: + resultObject = self.runSQL(""" + SELECT ({expression}) AS expression + FROM {{rel_args}} AS _args + LEFT OUTER JOIN ( + SELECT * + FROM {{rel_state}} AS _state + WHERE _state._iteration = {{iteration}} + ) AS _state ON True + """.format(expression=expression). + format(iteration=self.iteration, + **self.kwargs)) if resultObject.nrows() == 0: return None @@ -207,9 +203,9 @@ class IterationControllerComposite: cast_para = [None if self.new_state is None else self.new_state[i] for i in ('centroids', 'old_centroid_ids', 'cluster_variance', - 'objective_fn', 'frac_reassigned')] + 'objective_fn', 'frac_reassigned')] cast_para.extend([None if self.old_state is None else self.old_state[i] - for i in ('centroids', 'old_centroid_ids', 'cluster_variance', + for i in ('centroids', 'old_centroid_ids', 'cluster_variance', 'objective_fn', 'frac_reassigned')]) updateKwargs.update(curr_state=cast_str, old_state=cast_str_old)