reductionista commented on a change in pull request #505: URL: https://github.com/apache/madlib/pull/505#discussion_r462433675
########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,297 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, [BRUTE_FORCE, KD_TREE], 'DBSCAN') _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, depth) dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( + core_points_table, core_edge_table, distance_table)) + source_view = unique_string(desp='source_view') + plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) sql = """ - CREATE TABLE {distance_table} AS - SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, - {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ - FROM {source_table} AS __t1__, {source_table} AS __t2__ - WHERE __t1__.{id_column} != __t2__.{id_column}) q1 - WHERE __dist__ < {eps} - {dist_src_sql} + CREATE VIEW {source_view} AS + SELECT {id_column}, {expr_point} AS __expr_point__ + FROM {source_table} """.format(**locals()) plpy.execute(sql) + expr_point = '__expr_point__' + + if algorithm == KD_TREE: + cur_source_table, border_table1, border_table2 = dbscan_kd( + schema_madlib, source_view, id_column, expr_point, eps, + min_samples, metric, depth) + + kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ " + + sql = """ + SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__ + """.format(**locals()) + result = plpy.execute(sql) + rt_counts_dict = {} + for i in result: + rt_counts_dict[i['__leaf_id__']] = int(i['count']) + rt_counts_list = [] + for i in sorted(rt_counts_dict): + rt_counts_list.append(rt_counts_dict[i]) + + leaf_id_start = pow(2,depth)-1 + + find_core_points_table = unique_string(desp='find_core_points_table') + rt_edge_table = unique_string(desp='rt_edge_table') + rt_core_points_table = unique_string(desp='rt_core_points_table') + border_core_points_table = unique_string(desp='border_core_points_table') + border_edge_table = unique_string(desp='border_edge_table') + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format( + find_core_points_table, rt_edge_table, rt_core_points_table, + border_core_points_table, border_edge_table)) + + sql = """ + CREATE TABLE {find_core_points_table} AS + SELECT __leaf_id__, + {schema_madlib}.find_core_points( {id_column}, + {expr_point}::DOUBLE PRECISION[], + {eps}, + {min_samples}, + '{metric}', + ARRAY{rt_counts_list}, + __leaf_id__ + ) + FROM {cur_source_table} GROUP BY __leaf_id__ + {dist_leaf_sql} + """.format(**locals()) + plpy.execute(sql) + + sql = """ + CREATE TABLE {rt_edge_table} AS + SELECT (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__ + FROM ( + SELECT {schema_madlib}.unpack_2d(find_core_points) AS unpacked_2d + FROM {find_core_points_table} + ) q1 + WHERE (unpacked_2d).src NOT IN (SELECT {id_column} FROM {border_table1}) + {dist_src_sql} + """.format(**locals()) + plpy.execute(sql) + + sql = """ + CREATE TABLE {rt_core_points_table} AS + SELECT DISTINCT(__src__) AS {id_column} FROM {rt_edge_table} + """.format(**locals()) + plpy.execute(sql) + + # # Start border + sql = """ + CREATE TABLE {border_edge_table} AS + SELECT __src__, __dest__ FROM ( + SELECT __t1__.{id_column} AS __src__, + __t2__.{id_column} AS __dest__, + {schema_madlib}.{metric}( + __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ + FROM {border_table1} AS __t1__, {border_table2} AS __t2__)q1 + WHERE __dist__ < {eps} + """.format(**locals()) + plpy.execute(sql) Review comment: This query is an O(B^2) operation, where B is the number of boundary points. Same as brute force algorithm, but with N replaced by B. At first I thought this wouldn't be too bad, because it seems like B would be very small compared to N. Unfortunately, this reasoning only works well for a low number of dimensions. As the # of dimensions gets higher, the hypervolume of the boundary is not that different from the overall hypervolume. Here's a calculation of the runtime... I think this is the bottleneck for this whole algorithm. D = dimension of space N = number of points S = number of segments eps = epsilon Hypervolume of space containing all points = V = L^D Hypervolume of boundaries = O(eps * L^(D-1)) Hypervolume of boundaries between boundaries = O(eps^2 * L^(D-2)) ... Avg density of points = n = N/V = N/L^D Points in boundary regions = O(eps*n*L^(D-1)) = (eps/L)*O(N) Current runtime: (1/S)*(eps/L)^2*O(N)^2 The goal should be able to do this in O(N/S*log(N)). I think we can achieve this if we re-run the `find_core_points()` AGGREGATE on the boundary points, then on the boundaries of the boundaries, etc. until we reach 1D boundaries. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,297 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, [BRUTE_FORCE, KD_TREE], 'DBSCAN') _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, depth) dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( + core_points_table, core_edge_table, distance_table)) + source_view = unique_string(desp='source_view') + plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) sql = """ - CREATE TABLE {distance_table} AS - SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, - {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ - FROM {source_table} AS __t1__, {source_table} AS __t2__ - WHERE __t1__.{id_column} != __t2__.{id_column}) q1 - WHERE __dist__ < {eps} - {dist_src_sql} + CREATE VIEW {source_view} AS + SELECT {id_column}, {expr_point} AS __expr_point__ + FROM {source_table} """.format(**locals()) plpy.execute(sql) + expr_point = '__expr_point__' + + if algorithm == KD_TREE: + cur_source_table, border_table1, border_table2 = dbscan_kd( + schema_madlib, source_view, id_column, expr_point, eps, + min_samples, metric, depth) + + kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ " Review comment: I think we can remove this line, `kd_join_clause` is unused ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,297 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, [BRUTE_FORCE, KD_TREE], 'DBSCAN') _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, depth) dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( + core_points_table, core_edge_table, distance_table)) + source_view = unique_string(desp='source_view') + plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) sql = """ - CREATE TABLE {distance_table} AS - SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, - {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ - FROM {source_table} AS __t1__, {source_table} AS __t2__ - WHERE __t1__.{id_column} != __t2__.{id_column}) q1 - WHERE __dist__ < {eps} - {dist_src_sql} + CREATE VIEW {source_view} AS + SELECT {id_column}, {expr_point} AS __expr_point__ + FROM {source_table} """.format(**locals()) plpy.execute(sql) + expr_point = '__expr_point__' + + if algorithm == KD_TREE: + cur_source_table, border_table1, border_table2 = dbscan_kd( + schema_madlib, source_view, id_column, expr_point, eps, + min_samples, metric, depth) + + kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ " + + sql = """ + SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__ + """.format(**locals()) + result = plpy.execute(sql) + rt_counts_dict = {} + for i in result: + rt_counts_dict[i['__leaf_id__']] = int(i['count']) + rt_counts_list = [] + for i in sorted(rt_counts_dict): + rt_counts_list.append(rt_counts_dict[i]) + + leaf_id_start = pow(2,depth)-1 + + find_core_points_table = unique_string(desp='find_core_points_table') + rt_edge_table = unique_string(desp='rt_edge_table') + rt_core_points_table = unique_string(desp='rt_core_points_table') + border_core_points_table = unique_string(desp='border_core_points_table') + border_edge_table = unique_string(desp='border_edge_table') + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format( + find_core_points_table, rt_edge_table, rt_core_points_table, + border_core_points_table, border_edge_table)) + + sql = """ + CREATE TABLE {find_core_points_table} AS + SELECT __leaf_id__, + {schema_madlib}.find_core_points( {id_column}, + {expr_point}::DOUBLE PRECISION[], + {eps}, + {min_samples}, + '{metric}', + ARRAY{rt_counts_list}, + __leaf_id__ + ) + FROM {cur_source_table} GROUP BY __leaf_id__ + {dist_leaf_sql} + """.format(**locals()) + plpy.execute(sql) + + sql = """ + CREATE TABLE {rt_edge_table} AS + SELECT (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__ + FROM ( + SELECT {schema_madlib}.unpack_2d(find_core_points) AS unpacked_2d + FROM {find_core_points_table} + ) q1 + WHERE (unpacked_2d).src NOT IN (SELECT {id_column} FROM {border_table1}) + {dist_src_sql} + """.format(**locals()) + plpy.execute(sql) + + sql = """ + CREATE TABLE {rt_core_points_table} AS + SELECT DISTINCT(__src__) AS {id_column} FROM {rt_edge_table} + """.format(**locals()) + plpy.execute(sql) + + # # Start border + sql = """ + CREATE TABLE {border_edge_table} AS + SELECT __src__, __dest__ FROM ( + SELECT __t1__.{id_column} AS __src__, + __t2__.{id_column} AS __dest__, + {schema_madlib}.{metric}( + __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ + FROM {border_table1} AS __t1__, {border_table2} AS __t2__)q1 + WHERE __dist__ < {eps} + """.format(**locals()) + plpy.execute(sql) + + sql = """ + CREATE TABLE {border_core_points_table} AS + SELECT * FROM ( + SELECT __src__ AS {id_column}, count(*) AS __count__ + FROM {border_edge_table} GROUP BY __src__) q1 + WHERE __count__ >= {min_samples} + {dist_id_sql} Review comment: The HAVING keyword is for situations like this, looks a bit cleaner, and I think may also reduce the number of slices needed by 1: ``` CREATE TABLE {border_core_points_table} AS SELECT __src__ AS {id_column}, count(*) AS __count__ FROM {border_edge_table} GROUP BY __src__ HAVING __count__ >= {min_samples} {dist_id_sql} ``` (same for some other similar queries, but not sure how many of them will still look the same after other things get refactored) ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,297 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, [BRUTE_FORCE, KD_TREE], 'DBSCAN') _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, depth) dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( + core_points_table, core_edge_table, distance_table)) + source_view = unique_string(desp='source_view') + plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) sql = """ - CREATE TABLE {distance_table} AS - SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, - {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ - FROM {source_table} AS __t1__, {source_table} AS __t2__ - WHERE __t1__.{id_column} != __t2__.{id_column}) q1 - WHERE __dist__ < {eps} - {dist_src_sql} + CREATE VIEW {source_view} AS + SELECT {id_column}, {expr_point} AS __expr_point__ + FROM {source_table} """.format(**locals()) plpy.execute(sql) + expr_point = '__expr_point__' + + if algorithm == KD_TREE: + cur_source_table, border_table1, border_table2 = dbscan_kd( + schema_madlib, source_view, id_column, expr_point, eps, + min_samples, metric, depth) + + kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ " + + sql = """ + SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__ + """.format(**locals()) + result = plpy.execute(sql) + rt_counts_dict = {} + for i in result: + rt_counts_dict[i['__leaf_id__']] = int(i['count']) + rt_counts_list = [] + for i in sorted(rt_counts_dict): + rt_counts_list.append(rt_counts_dict[i]) + + leaf_id_start = pow(2,depth)-1 Review comment: `leaf_id_start` also unused ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -26,81 +26,288 @@ from utilities.utilities import add_postfix from utilities.utilities import NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc +from math import log +from math import floor +from math import sqrt + +from scipy.spatial import distance + +try: + from rtree import index + from rtree.index import Rtree +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + BRUTE_FORCE = 'brute_force' KD_TREE = 'kd_tree' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_KD_DEPTH = 3 +DEFAULT_METRIC = 'squared_dist_norm2' -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, depth, **kwargs): with MinWarning("warning"): - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + algorithm = BRUTE_FORCE if not algorithm else algorithm + depth = DEFAULT_KD_DEPTH if not depth else depth algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, [BRUTE_FORCE, KD_TREE], 'DBSCAN') _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, depth) dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( + core_points_table, core_edge_table, distance_table)) + + if algorithm == KD_TREE: + cur_source_table, border_table1, border_table2 = dbscan_kd( + schema_madlib, source_table, id_column, expr_point, eps, + min_samples, metric, depth) + + kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ " + + sql = """ + SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__ + """.format(**locals()) + result = plpy.execute(sql) + rt_counts_dict = {} + for i in result: + rt_counts_dict[i['__leaf_id__']] = int(i['count']) + rt_counts_list = [] + for i in sorted(rt_counts_dict): + rt_counts_list.append(rt_counts_dict[i]) + + leaf_id_start = pow(2,depth)-1 + + rtree_step_table = unique_string(desp='rtree_step_table') + rt_edge_table = unique_string(desp='rt_edge_table') + rt_core_points_table = unique_string(desp='rt_core_points_table') + border_core_points_table = unique_string(desp='border_core_points_table') + border_edge_table = unique_string(desp='border_edge_table') + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format( + rtree_step_table, rt_edge_table, rt_core_points_table, + border_core_points_table, border_edge_table)) + + sql = """ + CREATE TABLE {rtree_step_table} AS + SELECT __leaf_id__, + {schema_madlib}.rtree_step( {id_column}, + {expr_point}, + {eps}, + {min_samples}, + '{metric}', + ARRAY{rt_counts_list}, + __leaf_id__ + ) + FROM {cur_source_table} GROUP BY __leaf_id__ Review comment: I think as long as the leaves are all subsets of the segments, this should be okay. We're just pre-calculating some of the distances with the kd-tree that we would have calculated later when the boundary edge table is constructed. I think the kd-tree would still be useful, we'd just be shifting more of the work into the `find_core_points()` aggregate, saving work later. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, """.format(**locals()) result = plpy.execute(sql) +def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs): + + SD = kwargs['SD'] + if not state: + data = {} + SD['counter{0}'.format(leaf_id)] = 0 + else: + data = SD['data{0}'.format(leaf_id)] + + data[id_in] = expr_points + SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1 + SD['data{0}'.format(leaf_id)] = data + ret = [[-1,-1],[-1,-1]] + + my_n_rows = n_rows[leaf_id] + + if SD['counter{0}'.format(leaf_id)] == my_n_rows: + + core_counts = {} + core_lists = {} + p = index.Property() + p.dimension = len(expr_points) + idx = index.Index(properties=p) + ret = [] + + if metric == 'dist_norm1': + fn_dist = distance.cityblock + elif metric == 'dist_norm2': + fn_dist = distance.euclidean + else: + fn_dist = distance.sqeuclidean + + for key1, value1 in data.items(): + idx.add(key1,value1+value1,key1) Review comment: We already save the points in SD between each row. This would just be saving it as a kdtree data structure instead of a list data structure. (Or if we also need the list, we could save both.). No pickling necessary since it never gets transferred between segments or written out to disk. However--after reading this "streaming optimization" for rtree: https://rtree.readthedocs.io/en/latest/performance.html#use-stream-loading I'm wondering if bulk loading is already optimized inside rtree--if that's the case, then it's probably better to just leave it as is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org