reductionista commented on a change in pull request #505: URL: https://github.com/apache/madlib/pull/505#discussion_r461300564
########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,288 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index + from rtree.index import Rtree Review comment: I don't see `Rtree` used anywhere, I think we can get rid of this import. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, """.format(**locals()) result = plpy.execute(sql) +def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs): + + SD = kwargs['SD'] + if not state: + data = {} + SD['counter{0}'.format(leaf_id)] = 0 + else: + data = SD['data{0}'.format(leaf_id)] + + data[id_in] = expr_points + SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1 + SD['data{0}'.format(leaf_id)] = data + ret = [[-1,-1],[-1,-1]] + + my_n_rows = n_rows[leaf_id] + + if SD['counter{0}'.format(leaf_id)] == my_n_rows: + + core_counts = {} + core_lists = {} + p = index.Property() + p.dimension = len(expr_points) + idx = index.Index(properties=p) + ret = [] + + if metric == 'dist_norm1': + fn_dist = distance.cityblock + elif metric == 'dist_norm2': + fn_dist = distance.euclidean + else: + fn_dist = distance.sqeuclidean + + for key1, value1 in data.items(): + idx.add(key1,value1+value1,key1) + + for key1, value1 in data.items(): + + v1 = [] + v2 = [] + for dim in value1: Review comment: `dim` sounds more like the size of something; I'd go with `x`, sounds more like a coordinate. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, """.format(**locals()) result = plpy.execute(sql) +def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs): + + SD = kwargs['SD'] + if not state: + data = {} + SD['counter{0}'.format(leaf_id)] = 0 + else: + data = SD['data{0}'.format(leaf_id)] + + data[id_in] = expr_points + SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1 + SD['data{0}'.format(leaf_id)] = data + ret = [[-1,-1],[-1,-1]] + + my_n_rows = n_rows[leaf_id] + + if SD['counter{0}'.format(leaf_id)] == my_n_rows: + + core_counts = {} + core_lists = {} + p = index.Property() + p.dimension = len(expr_points) + idx = index.Index(properties=p) + ret = [] + + if metric == 'dist_norm1': + fn_dist = distance.cityblock + elif metric == 'dist_norm2': + fn_dist = distance.euclidean + else: + fn_dist = distance.sqeuclidean + + for key1, value1 in data.items(): + idx.add(key1,value1+value1,key1) Review comment: Maybe move this outside of the `if`? May as well build the rtree row by row as they come in. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, """.format(**locals()) result = plpy.execute(sql) +def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs): + + SD = kwargs['SD'] + if not state: + data = {} + SD['counter{0}'.format(leaf_id)] = 0 + else: + data = SD['data{0}'.format(leaf_id)] + + data[id_in] = expr_points + SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1 + SD['data{0}'.format(leaf_id)] = data + ret = [[-1,-1],[-1,-1]] + + my_n_rows = n_rows[leaf_id] + + if SD['counter{0}'.format(leaf_id)] == my_n_rows: + + core_counts = {} + core_lists = {} + p = index.Property() + p.dimension = len(expr_points) Review comment: I wish we could call this variable `point` rather than `expr_points`, as it's an array of numbers representing a single point rather than an expression or a list of multiple points or expressions. But I guess this is due to the sql interface to `rtree_step()`, where it does get passed a column of points or an expression of columns. Maybe we can change the name to `point` just for the transition function, and keep it as-is for the aggregate function? Or would that be too confusing having them named differently? I can't recall if we've done that elsewhere, but I think so. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, """.format(**locals()) result = plpy.execute(sql) +def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs): Review comment: Some general comments about the `rtree_transition()` function: This is functionally equivalent to running the dbscan algorithm on one leaf's worth of points. But it uses a different algorithm in that, it explores the points in a random order instead of a systematic order (moving from one neighbor to the next in a breadth-first-search). It seems like they would have similar runtime behavior in a typical case, but I'm a bit worried there may be situations where they perform very differently. Somewhat related to that: it seems like it would be better to prune the edges we won't need later now, rather than later, so we have less data to send back to master. Maybe there's a reason I'm missing why this aggregate function and the snowflake aggregate function have to be separate, but I think they ought to be combined if possible. Being able to do lookups in the rtree might also help the snowflake stage run faster. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -175,9 +383,113 @@ def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps """.format(**locals()) plpy.execute(sql) - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format( + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}".format( distance_table, core_points_table, core_edge_table, - reachable_points_table)) + reachable_points_table, sf_step_table, sf_edge_table)) + +def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps, + min_samples, metric, depth): + + n_features = num_features(source_table, expr_point) + + # If squared_dist_norm2 is used, we assume eps is set for the squared distance + # That means the border only needs to be sqrt(eps) wide + local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps + + kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree( + schema_madlib, source_table, expr_point, depth, n_features, local_eps) + + kd_source_table = unique_string(desp='kd_source_table') + kd_border_table1 = unique_string(desp='kd_border_table1') + kd_border_table2 = unique_string(desp='kd_border_table2') + + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table, kd_border_table1, kd_border_table2)) + + output_sql = """ + CREATE TABLE {kd_source_table} AS + SELECT *, + CASE {case_when_clause} END AS __leaf_id__ + FROM {source_table} + {dist_leaf_sql} + """.format(**locals()) + plpy.execute(output_sql) + + border_sql = """ + CREATE TABLE {kd_border_table1} AS + SELECT * + FROM {source_table} + WHERE {border_cl1} + """.format(**locals()) + plpy.execute(border_sql) + + border_sql = """ + CREATE TABLE {kd_border_table2} AS + SELECT * + FROM {source_table} + WHERE {border_cl2} + """.format(**locals()) + plpy.execute(border_sql) + + return kd_source_table, kd_border_table1, kd_border_table2 + + +def build_kd_tree(schema_madlib, source_table, expr_point, + depth, n_features, eps, **kwargs): + """ + KD-tree function to create a partitioning for KNN + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Name of the table to store kd tree + @param expr_point Name of the column with training data + or expression that evaluates to a + numeric array + @param depth Depth of the kd tree + """ + with MinWarning("error"): + + n_features = num_features(source_table, expr_point) + + clauses = [' 1=1 '] + border_cl1 = ' 1!=1 ' + border_cl2 = ' 1!=1 ' Review comment: I have a slight preference for using `True` and `False` instead of `1=1` and `1!=1`. Either way is fine I guess. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,288 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index + from rtree.index import Rtree +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth Review comment: I suggest we should remove depth as a parameter, and instead set: ``` depth = int(log_2 (num_segments))+1 ``` ie, the number of leaves should be the next power of 2 above number of segments. I can't think of a situation where it would be better to set it to something else. We want the number of leaves to match the number of segments as closely as possible, to reduce extra work in merging the leaves together later, and require fewer rtrees... but it can't be less than the number of segments otherwise we won't be able to use all the segments in parallel, some will have to sit idle. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -175,9 +383,113 @@ def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps """.format(**locals()) plpy.execute(sql) - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format( + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}".format( distance_table, core_points_table, core_edge_table, - reachable_points_table)) + reachable_points_table, sf_step_table, sf_edge_table)) + +def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps, + min_samples, metric, depth): + + n_features = num_features(source_table, expr_point) + + # If squared_dist_norm2 is used, we assume eps is set for the squared distance + # That means the border only needs to be sqrt(eps) wide + local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps + + kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree( + schema_madlib, source_table, expr_point, depth, n_features, local_eps) + + kd_source_table = unique_string(desp='kd_source_table') + kd_border_table1 = unique_string(desp='kd_border_table1') + kd_border_table2 = unique_string(desp='kd_border_table2') + + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table, kd_border_table1, kd_border_table2)) + + output_sql = """ + CREATE TABLE {kd_source_table} AS + SELECT *, + CASE {case_when_clause} END AS __leaf_id__ + FROM {source_table} + {dist_leaf_sql} + """.format(**locals()) + plpy.execute(output_sql) + + border_sql = """ + CREATE TABLE {kd_border_table1} AS + SELECT * + FROM {source_table} + WHERE {border_cl1} + """.format(**locals()) + plpy.execute(border_sql) + + border_sql = """ + CREATE TABLE {kd_border_table2} AS + SELECT * + FROM {source_table} + WHERE {border_cl2} + """.format(**locals()) + plpy.execute(border_sql) + + return kd_source_table, kd_border_table1, kd_border_table2 + + +def build_kd_tree(schema_madlib, source_table, expr_point, + depth, n_features, eps, **kwargs): + """ + KD-tree function to create a partitioning for KNN Review comment: Update comment about kNN ########## File path: src/ports/postgres/modules/dbscan/dbscan.sql_in ########## @@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict( PythonFunction(dbscan, dbscan, dbscan_predict_help) $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge( + state1 INTEGER[][], + state2 INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final( + state INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition( + state INTEGER[], + src BIGINT, + dest BIGINT, + n_rows INTEGER[], + gp_segment_id INTEGER + +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_transition(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step( + BIGINT, + BIGINT, + INTEGER[], + INTEGER); +CREATE AGGREGATE MADLIB_SCHEMA.sf_step( + BIGINT, + BIGINT, + INTEGER[], + INTEGER +)( + STYPE=INTEGER[][], + SFUNC=MADLIB_SCHEMA.sf_transition, + m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,') + FINALFUNC=MADLIB_SCHEMA.sf_final +); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition( + state INTEGER[], + id_in BIGINT, + expr_points DOUBLE PRECISION[], + eps DOUBLE PRECISION, + min_samples INTEGER, + metric VARCHAR, + n_rows INTEGER[], + leaf_id INTEGER + +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_transition(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge( + state1 INTEGER[][], + state2 INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final( + state INTEGER[][] +) RETURNS INTEGER[] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step( Review comment: As I understand it, for SGD the aggregate function is named `madlib_keras_step()` because it's a single step in the long series of steps taken in the gradient descent process, ie it's called in a for loop many times. But for the aggregate that builds the rtree, we only call it once--so including `_step()` in the name seems less appropriate here. (Same goes for `sf_step()` function.) Also, `rtree_step` doesn't indicate the purpose of the function, just the data structure used to implement it. It took me a while reading through the code to figure out what this function does and how it fits into the rest. I think naming it something like`find_core_points()` would increase readability a lot. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,288 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index + from rtree.index import Rtree +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, [BRUTE_FORCE, KD_TREE], 'DBSCAN') _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, depth) dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( + core_points_table, core_edge_table, distance_table)) + + if algorithm == KD_TREE: + cur_source_table, border_table1, border_table2 = dbscan_kd( + schema_madlib, source_table, id_column, expr_point, eps, + min_samples, metric, depth) + + kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ " + + sql = """ + SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__ + """.format(**locals()) + result = plpy.execute(sql) + rt_counts_dict = {} + for i in result: + rt_counts_dict[i['__leaf_id__']] = int(i['count']) + rt_counts_list = [] + for i in sorted(rt_counts_dict): + rt_counts_list.append(rt_counts_dict[i]) + + leaf_id_start = pow(2,depth)-1 + + rtree_step_table = unique_string(desp='rtree_step_table') + rt_edge_table = unique_string(desp='rt_edge_table') + rt_core_points_table = unique_string(desp='rt_core_points_table') + border_core_points_table = unique_string(desp='border_core_points_table') + border_edge_table = unique_string(desp='border_edge_table') + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format( + rtree_step_table, rt_edge_table, rt_core_points_table, + border_core_points_table, border_edge_table)) + + sql = """ + CREATE TABLE {rtree_step_table} AS + SELECT __leaf_id__, + {schema_madlib}.rtree_step( {id_column}, + {expr_point}, + {eps}, + {min_samples}, + '{metric}', + ARRAY{rt_counts_list}, + __leaf_id__ + ) + FROM {cur_source_table} GROUP BY __leaf_id__ Review comment: I recommend we GROUP BY `gp_segment_id` instead of `__leaf_id__` here. We can add all the points in each segment to the same rtree, that will mean fewer rtrees and less to merge later. In some cases, it may not help, but if the regions within a single segment happen to be near each other or overlapping, it could help a lot... especially if we let the user control the depth (as they might set it so there are many leaves in each segment. We might even be able to remove the GROUP BY entirely, but that would require changing the merge function a bit. ########## File path: src/ports/postgres/modules/dbscan/dbscan.sql_in ########## @@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict( PythonFunction(dbscan, dbscan, dbscan_predict_help) $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge( + state1 INTEGER[][], + state2 INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final( + state INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition( + state INTEGER[], + src BIGINT, + dest BIGINT, + n_rows INTEGER[], + gp_segment_id INTEGER + +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_transition(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step( + BIGINT, + BIGINT, + INTEGER[], + INTEGER); +CREATE AGGREGATE MADLIB_SCHEMA.sf_step( + BIGINT, + BIGINT, + INTEGER[], + INTEGER +)( + STYPE=INTEGER[][], + SFUNC=MADLIB_SCHEMA.sf_transition, + m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,') + FINALFUNC=MADLIB_SCHEMA.sf_final +); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition( + state INTEGER[], + id_in BIGINT, + expr_points DOUBLE PRECISION[], + eps DOUBLE PRECISION, + min_samples INTEGER, + metric VARCHAR, + n_rows INTEGER[], + leaf_id INTEGER + +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_transition(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge( + state1 INTEGER[][], + state2 INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final( + state INTEGER[][] +) RETURNS INTEGER[] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step( Review comment: As I understand it, for SGD the aggregate function is named `madlib_keras_step()` because it's a single step in the long series of steps taken in the gradient descent process, ie it's called in a for loop many times. But for the aggregate that builds the rtree, we only call it once--so including `_step()` in the name seems less appropriate here. (Same goes for `sf_step()` function.) Also, `rtree_step` doesn't indicate the purpose of the function, just the data structure used to implement it. It took me a while reading through the code to figure out what this function does and how it fits into the rest. I think naming it something like`find_core_points()`or `build_edge_table` would increase readability a lot. ########## File path: src/ports/postgres/modules/dbscan/dbscan.sql_in ########## @@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict( PythonFunction(dbscan, dbscan, dbscan_predict_help) $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge( + state1 INTEGER[][], + state2 INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final( + state INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition( + state INTEGER[], + src BIGINT, + dest BIGINT, + n_rows INTEGER[], + gp_segment_id INTEGER + +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.sf_transition(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step( + BIGINT, + BIGINT, + INTEGER[], + INTEGER); +CREATE AGGREGATE MADLIB_SCHEMA.sf_step( + BIGINT, + BIGINT, + INTEGER[], + INTEGER +)( + STYPE=INTEGER[][], + SFUNC=MADLIB_SCHEMA.sf_transition, + m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,') + FINALFUNC=MADLIB_SCHEMA.sf_final +); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition( + state INTEGER[], + id_in BIGINT, + expr_points DOUBLE PRECISION[], + eps DOUBLE PRECISION, + min_samples INTEGER, + metric VARCHAR, + n_rows INTEGER[], + leaf_id INTEGER + +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_transition(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge( + state1 INTEGER[][], + state2 INTEGER[][] +) RETURNS INTEGER[][] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final( + state INTEGER[][] +) RETURNS INTEGER[] AS $$ +PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') + return dbscan.rtree_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step( Review comment: As I understand it, for SGD the aggregate function is named `madlib_keras_step()` because it's a single step in the long series of steps taken in the gradient descent process, ie it's called in a for loop many times. But for the aggregate that builds the rtree, we only call it once--so including `_step()` in the name seems less appropriate here. (Same goes for `sf_step()` function.) Also, `rtree_step` doesn't indicate the purpose of the function, just the data structure used to implement it. It took me a while reading through the code to figure out what this function does and how it fits into the rest. I think naming it something like`find_core_points()`or `build_edge_table()` would increase readability a lot. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org