This is an automated email from the ASF dual-hosted git repository. domino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit dfbd29d0c6e21abeb9eef6a0968bd8fbc58b37ba Author: Domino Valdano <dvald...@vmware.com> AuthorDate: Fri Aug 7 15:44:10 2020 -0700 DBSCAN: Fast parallel-optimized DBSCAN This optimized version of DBSCAN offers a dramatic improvment over the original brute force implementation. It should have roughly roughly O(N/S log N) runtimes, where N is the size of the input dataset (number of points/rows) and S is the number of segments used (note: the algorithm now decides on its own how many segments to use, as sometimes splitting things up further will only increase the runtime. Therefore S is not necessarily the total number of segments in the cluster, it may be less depending on the structure of the dataset.) This borrows many aspects from the previous attempt (using an overlapping spatial binary tree to segment the dataset and using an R tree index to speed up range queries), but differs in its approach in other ways. The brute force DBSCAN runs on N^2 time. To improve this, we split the data into different overlapping regions, running DBSCAN on each in parallel, then merging the results together on the coordinator. More specifically: 1. The data is segmented into connected spatial regions, using an in-house designed binary tree optimized specifically for DBSCAN. This custom DBSCAN-optimized tree is similar to a kd-tree, but tries to simultaneously keep the child nodes of each node as balanced as possible while splitting along a hyperplane which favors passing through the least dense regions of the space. To accomplish this, it constructs a course-grained density map of each node before deciding where to split, minimizing a loss function which tries to estimate the longest expected runtime of any segment assigned to the descendants of that node. 2. Each leaf of the optimized spatial tree runs the dbscan algorithm on the points in its spatial region, including some points from other regions near its boundaries, using an R tree index for efficient range queries. 3. Merge the clusters found in each leaf with each other by keeping track of which points in neighboring leaves are within eps of a cluster's home leaf. Uses madilb's wcc graph module (weakly connected components) to identify the equivalence classes of clusters across all leaves. ================ Partial List of Detailed Improvements ============== - Initial build_kdtree() re-write Previously, we were including a lot of points that were nowhere near the border as border points. The new version should only include those which are actually within epsilon distance of a border. It also outputs the information in a different format that provides a bit more information, and will make a fast merge of clusters from different segments possible in a later stage. The output of build_kdtree (renamed to build_optimized_tree) is now a single table of annotated points instead of many tables. The __leaf_id__ column has been split into two similar concepts, __leaf_id__ and __dist_id__. __leaf_id__ tells you which leaf a point is in, and __dist_id__ tells you which (extended) leaf a row is in--which determines the segment on which it will be stored. Note that each point can have several entries in the segmented_output table. There is not a 1-to-1 correspondence between points and rows, since we store each border point in multiple overlapping leaves (segments) at once. - Convert find_core_points() UDA to dbscan_leaf() UDF - Add dbscan_record class, for managing input and output of dbscan_leaf ( maps to dbscan_record SQL TYPE, but with some hidden fields added for keeping track of record-specific variables in python; these are automatically removed before yielding the final result, when the record gets cloned via the copy constructor ) - Allow passing id_column as an expression: id_column input param is now handled the same as we do the point column, so that any expression evaluating to an integer type should work instead of requiring the id to be just a single column. The name of the column in the output table will now always be 'id', but the actual expression used on the original input table is still stored in the summary table. - Use sqrt(eps) only for kd_tree, raw eps for dbscan_leaf The second one compares a squared distance to a squared distance metric, while for generating the kd tree we need an actual distance - Further re-write of build_kd_tree(): It was taking 30s to generate the overlapping kd-tree for 10,000 points, where we were doing many long queries, iteratively in python. And several minutes for 100,000 points. But this should be the fast part! Now, it will generate the non-overlapping kd-tree first, then add the external points for the overlap regions. The generation of the regular kd-tree, along with cutoff table, is working now and very fast (< 1s for 10,000 points, < 5s for 100,000 points). It's just a single 0.5s recursive query for generating the segmented source (without external point augmentation), and an even simpler 0.1s query afterwards for the cutoffs. I don't expect adding the overlap points will take too much time either, as we already have the non-overlapping cutoffs. We can generate a leaf bounds table from that (min & max boundary points for each leaf) without even touching the source table, and should be able to join that to the source table in a final query which scans through N/S * max_depth rows per segment (similar to the first two fast queries). - Adds print_timings debug statements, for detailed performance profiling - Changes __dist_id__ to dist_id and add new __dist_id__; This adds another layer of abstraction to the mapping from leaves to dist_id's to segments, but was needed to ensure that the leaves are balanced among the physical cluster segments... otherwise, for a small number of leaves the dist_ids map somewhat randomly to the segments, and several can end up on the same segment while other segments handle no data at all. This will always place leaves on the segments in a round-robin fashion rather than based on gpdb's internal DISTRIBUTED BY hash function - Move dbscan_optmized into its own class - Fix use of wrong eps in build_kd_tree (there were cases where we were using eps instead of eps^2 for the squared-dist-norm). - Add functions for optimized tree segmentation (uses loss function instead of kdtree for making cuts) - Skip missing dist_id's while creating dist_map: If all the dist_id's are sequential with no gaps, as with the kd-tree then this wouldn't be necessary. But now, since some branches of the tree stop splitting before others, we need to account for these missing dist_id's... otherwise you can end up with several dist_id's on the same segment and other segments with nothing. - Accelerates range_query() using vectorized numpy operations - Move many functions into a self-contained DBSCANStorage class This makes things much easier to manage, less params to pass around - Early removal of points from rtree, ony storing minimum necessary Delete each point from rtree as soon as the first range query is run on it, to speed up range queries run on neighboring points. The smaller the tree, the faster the results get returned. (Searching the tree for a single point is log N, but a range query returns many points... so it's at *least* O(log N + k) and possibly more like O(k log N), where k is the number of candidate neighbors returned, which we have to go through one at a time to check if they are actually in range) Deleting points as soon as we label them without any further modifications would throw off the calculation of is_core_point. We can still delete them early, but to ensure is_core_point is right we need to keep track of the number of neighbors each internal point has. Some are internal and others are internal. The sum of both determines the final value of is_core_point. After doing this, a lot of things we were previously doing no longer made sense. Since we need to query external neighbors of the internal points anyway, we may as well do all of the inverse-queries up front instead (searching for internal neighbors of each external point). In general, external points are expected to be fewer than internal points so hopefully this will be faster. But then we have no more use for the cluster-specific trees (unless we want to add them back for use by the internal points?) range_query no longer has min_results or max_results params - All examples passing, one each for 3D, 4D, 9D, 16D, 25D, 36D, 49D, and 64D, tested both on 3 segment cluster and 16 segment cluster - Change default depth to num_segs instead of log_2(num_segs) log_2(num_segs) made sense for the kd_tree, because each node of the tree is always split exactly in half, resulting in a balanced binary tree. With the optimized tree algorithm, splits can happen anywhere, there can be 1 segment on the left and 50 on the right, and then later down the tree the 50 get split up more while the 1 does not. In the worst case, one node gets split off from the rest at each cut, meaning in order to populate num_segs leaves you need num_segs cuts. This could result in a longer optimzation time, but the optimization phase is usually very fast compared to the actual parallel-DBSCAN phase that follows. It should only matter for small datasets, and the user always has the option of setting a lower max_depth than the default if optimization is taking too much time. - Cast input to DOUBLE PRECISION[] in source_view (This is necessary to be able to handle input tables where the points column is REAL[], INTEGER[], etc. - Rename depth -> max_segmentation_depth - Rename "kd_tree" method option to "optimized" - Remove unused kd_tree code - Fix a subtle issue involving early removal of border points from rtree: If range_query is called on a point in the outer for loop (where we're looking for a new cluster seed, not growing a cluster yet), then it will be tentatively labelled as NOISE_POINT... but may be assigned to a cluster later, if it borders a core point while we're growing a cluster. Instead of keeping these points in the rtree, we go ahead and delete them (same as for core points or noise points, which we know won't need further processing) but must keep track of a list of _possible_border_points in each internal db_rec. During update_neighbors (called by range_query), we can also add the id to each of its unlabelled neighbors' _possible_border_points list, in case they turn out to be core points later. (None of them will be labelled as core points yet, since anything marked as a core point has already been removed from the tree; but there may be some NOISE_POINT's; we skip the neighboring NOISE_POINT's since we know they won't ever be core points.) - Add scripts for creating blobs for perf testing - Add scripts for generating and running dbscan perf tests --- .gitignore | 1 + src/ports/postgres/modules/dbscan/dbscan.py_in | 1696 +++++++++++++++----- src/ports/postgres/modules/dbscan/dbscan.sql_in | 379 +++-- .../postgres/modules/dbscan/test/dbscan.sql_in | 142 +- .../dbscan/test/unit_tests/test_dbscan.py_in | 348 +++- 5 files changed, 1971 insertions(+), 595 deletions(-) diff --git a/.gitignore b/.gitignore index ad9a296..2cbb50b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Ignore build directory /build* /build-docker* +/src/ports/postgres/modules/dbscan/test/perf/blobs # Ignore generated code files *.so diff --git a/src/ports/postgres/modules/dbscan/dbscan.py_in b/src/ports/postgres/modules/dbscan/dbscan.py_in index 40e1205..00986c5 100644 --- a/src/ports/postgres/modules/dbscan/dbscan.py_in +++ b/src/ports/postgres/modules/dbscan/dbscan.py_in @@ -23,7 +23,7 @@ from utilities.control import MinWarning from utilities.utilities import _assert from utilities.utilities import unique_string from utilities.utilities import add_postfix -from utilities.utilities import NUMERIC, ONLY_ARRAY +from utilities.utilities import INTEGER, NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg from utilities.utilities import num_features @@ -36,10 +36,16 @@ from utilities.validate_args import get_algorithm_name from graph.wcc import wcc from math import log -from math import floor +from math import ceil from math import sqrt +from time import time +import numpy as np +from collections import deque -from scipy.spatial import distance +import utilities.debug as DEBUG +DEBUG.plpy_info_enabled = False +DEBUG.plpy_execute_enabled = False +DEBUG.timings_enabled = False try: from rtree import index @@ -48,323 +54,219 @@ except ImportError: else: RTREE_ENABLED=1 -BRUTE_FORCE = 'brute_force' -KD_TREE = 'kd_tree' +METHOD_BRUTE_FORCE = 'brute_force' +METHOD_OPTIMIZED = 'optimized' DEFAULT_MIN_SAMPLES = 5 -DEFAULT_KD_DEPTH = 3 DEFAULT_METRIC = 'squared_dist_norm2' def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, - eps, min_samples, metric, algorithm, depth, **kwargs): + eps, min_samples, metric, algorithm, max_segmentation_depth, **kwargs): with MinWarning("warning"): - + # algorithm=None is handled in get_algorithm_name() min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples metric = DEFAULT_METRIC if not metric else metric - algorithm = BRUTE_FORCE if not algorithm else algorithm - depth = DEFAULT_KD_DEPTH if not depth else depth + num_segs = get_seg_number() + + algorithm = get_algorithm_name(algorithm, METHOD_OPTIMIZED, + [METHOD_BRUTE_FORCE, METHOD_OPTIMIZED], 'DBSCAN') - algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, - [BRUTE_FORCE, KD_TREE], 'DBSCAN') + if max_segmentation_depth is None: + # Default to num_segs + max_depth = num_segs + else: + max_depth = max_segmentation_depth + if algorithm != METHOD_OPTIMIZED: + plpy.warning("Ignoring max_segmentation_depth={} param, " + "N/A to algorithm={}".format(max_depth, algorithm)) _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm, depth) + expr_point, eps, min_samples, metric, algorithm, max_depth) - dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' - dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) - dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' - dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' + dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' + dist_by_id = '' if is_platform_pg() else 'DISTRIBUTED BY (id)' + dist_by_reach_id = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_by_seg_id = '' if is_platform_pg() else 'DISTRIBUTED BY (seg_id)' + distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' core_points_table = unique_string(desp='core_points_table') core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format( - core_points_table, core_edge_table, distance_table)) source_view = unique_string(desp='source_view') plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) sql = """ CREATE VIEW {source_view} AS - SELECT {id_column}, {expr_point} AS __expr_point__ + SELECT ({id_column})::BIGINT AS id, ({expr_point})::DOUBLE PRECISION[] AS point FROM {source_table} """.format(**locals()) plpy.execute(sql) - expr_point = '__expr_point__' - - if algorithm == KD_TREE: - cur_source_table, border_table1, border_table2 = dbscan_kd( - schema_madlib, source_view, id_column, expr_point, eps, - min_samples, metric, depth) - - sql = """ - SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__ - """.format(**locals()) - result = plpy.execute(sql) - rt_counts_dict = {} - for i in result: - rt_counts_dict[i['__leaf_id__']] = int(i['count']) - rt_counts_list = [] - for i in sorted(rt_counts_dict): - rt_counts_list.append(rt_counts_dict[i]) - - find_core_points_table = unique_string(desp='find_core_points_table') - rt_edge_table = unique_string(desp='rt_edge_table') - rt_core_points_table = unique_string(desp='rt_core_points_table') - border_core_points_table = unique_string(desp='border_core_points_table') - border_edge_table = unique_string(desp='border_edge_table') - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format( - find_core_points_table, rt_edge_table, rt_core_points_table, - border_core_points_table, border_edge_table)) - - sql = """ - CREATE TABLE {find_core_points_table} AS - SELECT __leaf_id__, - {schema_madlib}.find_core_points( {id_column}, - {expr_point}::DOUBLE PRECISION[], - {eps}, - {min_samples}, - '{metric}', - ARRAY{rt_counts_list}, - __leaf_id__ - ) - FROM {cur_source_table} GROUP BY __leaf_id__ - {dist_leaf_sql} - """.format(**locals()) - plpy.execute(sql) - - sql = """ - CREATE TABLE {rt_edge_table} AS - SELECT (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__ - FROM ( - SELECT {schema_madlib}.unpack_2d(find_core_points) AS unpacked_2d - FROM {find_core_points_table} - ) q1 - WHERE (unpacked_2d).src NOT IN (SELECT {id_column} FROM {border_table1}) - {dist_src_sql} - """.format(**locals()) - plpy.execute(sql) - - sql = """ - CREATE TABLE {rt_core_points_table} AS - SELECT DISTINCT(__src__) AS {id_column} FROM {rt_edge_table} - """.format(**locals()) - plpy.execute(sql) - - # # Start border - sql = """ - CREATE TABLE {border_edge_table} AS - SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, - {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ - FROM {border_table1} AS __t1__, {border_table2} AS __t2__)q1 - WHERE __dist__ < {eps} - """.format(**locals()) - plpy.execute(sql) - - sql = """ - CREATE TABLE {border_core_points_table} AS - SELECT * FROM ( - SELECT __src__ AS {id_column}, count(*) AS __count__ - FROM {border_edge_table} GROUP BY __src__) q1 - WHERE __count__ >= {min_samples} - {dist_id_sql} - """.format(**locals()) - plpy.execute(sql) - - # Build common tables - sql = """ - CREATE TABLE {distance_table} AS - SELECT * FROM {rt_edge_table} - UNION - SELECT * FROM {border_edge_table} - """.format(**locals()) - plpy.execute(sql) - sql = """ - CREATE TABLE {core_points_table} AS - SELECT {id_column} FROM {border_core_points_table} - UNION - SELECT {id_column} FROM {rt_core_points_table} - """.format(**locals()) - plpy.execute(sql) - - sql = """ - CREATE TABLE {core_edge_table} AS - SELECT __t1__.__src__, __t1__.__dest__ - FROM {rt_edge_table} __t1__ , - (SELECT array_agg({id_column}) AS arr - FROM {core_points_table}) __t2__ - WHERE __t1__.__dest__ = ANY(arr) - UNION - SELECT __t1__.__src__, __t1__.__dest__ - FROM {border_edge_table} __t1__ , - (SELECT array_agg({id_column}) AS arr - FROM {core_points_table}) __t2__ - WHERE __t1__.__src__ = ANY(arr) AND __t1__.__dest__ = ANY(arr) - """.format(**locals()) - plpy.execute(sql) - - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}".format( - find_core_points_table, rt_edge_table, rt_core_points_table, - border_core_points_table, border_edge_table, - cur_source_table, border_table1, border_table2)) + if algorithm == METHOD_OPTIMIZED: + dbscan_opt = DBSCAN_optimized(schema_madlib, source_view, output_table, + eps, min_samples, metric, max_depth) + dbscan_opt.run() else: - # Calculate pairwise distances sql = """ - CREATE TABLE {distance_table} AS + CREATE TEMP TABLE {distance_table} AS SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, + SELECT __t1__.id AS __src__, + __t2__.id AS __dest__, {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ + __t1__.point, __t2__.point) AS __dist__ FROM {source_view} AS __t1__, {source_view} AS __t2__)q1 WHERE __dist__ < {eps} - {dist_src_sql} + {dist_by_src} """.format(**locals()) plpy.execute(sql) # Find core points sql = """ - CREATE TABLE {core_points_table} AS + CREATE TEMP TABLE {core_points_table} AS SELECT * FROM ( - SELECT __src__ AS {id_column}, count(*) AS __count__ + SELECT __src__ AS id, count(*) AS __count__ FROM {distance_table} GROUP BY __src__) q1 WHERE __count__ >= {min_samples} - {dist_id_sql} + {dist_by_id} """.format(**locals()) plpy.execute(sql) # Find the connections between core points to form the clusters sql = """ - CREATE TABLE {core_edge_table} AS + CREATE TEMP TABLE {core_edge_table} AS SELECT __src__, __dest__ - FROM {distance_table} AS __t1__, (SELECT array_agg({id_column}) AS arr + FROM {distance_table} AS __t1__, (SELECT array_agg(id) AS arr FROM {core_points_table}) __t2__ WHERE __t1__.__src__ = ANY(arr) AND __t1__.__dest__ = ANY(arr) - {dist_src_sql} + {dist_by_src} """.format(**locals()) plpy.execute(sql) - sql = """ - SELECT count(*) FROM {core_points_table} - """.format(**locals()) - core_count = plpy.execute(sql)[0]['count'] - _assert(core_count != 0, "DBSCAN: Cannot find any core points/clusters.") - - # Start snowflake creation - if is_platform_pg(): sql = """ - SELECT count(*) FROM {core_edge_table} + SELECT count(*) FROM {core_points_table} """.format(**locals()) - count = plpy.execute(sql)[0]['count'] + core_count = plpy.execute(sql)[0]['count'] + _assert(core_count != 0, "DBSCAN: Cannot find any core points/clusters.") - counts_list = [int(count)] - seg_id = 0 - group_by_clause = '' - dist_clause = '' + # Start snowflake creation + if is_platform_pg(): + sql = """ + SELECT count(*) FROM {core_edge_table} + """.format(**locals()) + count = plpy.execute(sql)[0]['count'] - else: - sql = """ - SELECT count(*), gp_segment_id FROM {core_edge_table} GROUP BY gp_segment_id - """.format(**locals()) - count_result = plpy.execute(sql) - seg_num = get_seg_number() - counts_list = [0]*seg_num - for i in count_result: - counts_list[int(i['gp_segment_id'])] = int(i['count']) - seg_id = 'gp_segment_id' - group_by_clause = 'GROUP BY gp_segment_id' - dist_clause = 'DISTRIBUTED BY (seg_id)' - - snowflake_table = unique_string(desp='snowflake_table') - sf_edge_table = unique_string(desp='sf_edge_table') - - plpy.execute("DROP TABLE IF EXISTS {0}, {1}".format( - snowflake_table, sf_edge_table)) - sql = """ - CREATE TABLE {snowflake_table} AS - SELECT {seg_id}::INTEGER AS seg_id, - {schema_madlib}.build_snowflake_table( __src__, - __dest__, - ARRAY{counts_list}, - {seg_id} - ) AS __sf__ - FROM {core_edge_table} {group_by_clause} - {dist_clause} - """.format(**locals()) - plpy.execute(sql) + counts_list = [int(count)] + seg_id = 0 + group_by_clause = '' - sql = """ - CREATE TABLE {sf_edge_table} AS - SELECT seg_id, (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__ - FROM ( - SELECT seg_id, - {schema_madlib}.unpack_2d(__sf__) AS unpacked_2d - FROM {snowflake_table} - ) q1 - {dist_clause} + else: + sql = """ + SELECT count(*), gp_segment_id FROM {core_edge_table} GROUP BY gp_segment_id + """.format(**locals()) + count_result = plpy.execute(sql) + seg_num = get_seg_number() + counts_list = [0]*seg_num + for i in count_result: + counts_list[int(i['gp_segment_id'])] = int(i['count']) + seg_id = 'gp_segment_id' + group_by_clause = 'GROUP BY gp_segment_id' + + snowflake_table = unique_string(desp='snowflake_table') + sf_edge_table = unique_string(desp='sf_edge_table') + + plpy.execute("DROP TABLE IF EXISTS {0}, {1}".format( + snowflake_table, sf_edge_table)) + sql = """ + CREATE TEMP TABLE {snowflake_table} AS + SELECT {seg_id}::INTEGER AS seg_id, + {schema_madlib}.build_snowflake_table( __src__, + __dest__, + ARRAY{counts_list}, + {seg_id} + ) AS __sf__ + FROM {core_edge_table} {group_by_clause} + {dist_by_seg_id} """.format(**locals()) - plpy.execute(sql) + plpy.execute(sql) - # Run wcc to get the min id for each cluster - wcc(schema_madlib, core_points_table, id_column, sf_edge_table, - 'src=__src__, dest=__dest__', output_table, None) + sql = """ + CREATE TEMP TABLE {sf_edge_table} AS + SELECT seg_id, (edge).src AS __src__, (edge).dest AS __dest__ + FROM ( + SELECT seg_id, + {schema_madlib}.__dbscan_unpack_edge(__sf__) AS edge + FROM {snowflake_table} + ) q1 + {dist_by_seg_id} + """.format(**locals()) + plpy.execute(sql) - plpy.execute(""" - ALTER TABLE {0} - ADD COLUMN is_core_point BOOLEAN, - ADD COLUMN __points__ DOUBLE PRECISION[] - """.format(output_table)) - plpy.execute(""" - ALTER TABLE {0} - RENAME COLUMN component_id TO cluster_id - """.format(output_table)) - plpy.execute(""" - UPDATE {0} - SET is_core_point = TRUE - """.format(output_table)) + # Run wcc to get the min id for each cluster + wcc(schema_madlib, core_points_table, 'id', sf_edge_table, + 'src=__src__, dest=__dest__', output_table, None) + + plpy.execute(""" + ALTER TABLE {0} + ADD COLUMN is_core_point BOOLEAN, + ADD COLUMN point DOUBLE PRECISION[]; + """.format(output_table) + ) + plpy.execute(""" + ALTER TABLE {0} + RENAME COLUMN component_id TO cluster_id; + """.format(output_table) + ) + plpy.execute(""" + UPDATE {0} + SET is_core_point = TRUE + """.format(output_table) + ) + + # Find reachable points + reachable_points_table = unique_string(desp='reachable_points_table') + plpy.execute("DROP TABLE IF EXISTS {0}".format(reachable_points_table)) + sql = """ + CREATE TEMP TABLE {reachable_points_table} AS + SELECT array_agg(__src__) AS __src_list__, + __dest__ AS __reachable_id__ + FROM {distance_table} AS __t1__, + (SELECT array_agg(id) AS __arr__ + FROM {core_points_table}) __t2__ + WHERE __src__ = ANY(__arr__) AND __dest__ != ALL(__arr__) + GROUP BY __dest__ + {dist_by_reach_id} + """.format(**locals()) + plpy.execute(sql) - # Find reachable points - reachable_points_table = unique_string(desp='reachable_points_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(reachable_points_table)) - sql = """ - CREATE TABLE {reachable_points_table} AS - SELECT array_agg(__src__) AS __src_list__, - __dest__ AS __reachable_id__ - FROM {distance_table} AS __t1__, - (SELECT array_agg({id_column}) AS __arr__ - FROM {core_points_table}) __t2__ - WHERE __src__ = ANY(__arr__) AND __dest__ != ALL(__arr__) - GROUP BY __dest__ - {dist_reach_sql} - """.format(**locals()) - plpy.execute(sql) + sql = """ + INSERT INTO {output_table} + SELECT __reachable_id__, + cluster_id, + FALSE AS is_core_point, + NULL AS point + FROM {reachable_points_table} AS __t1__ JOIN + {output_table} AS __t2__ + ON (__src_list__[1] = id) + """.format(**locals()) + plpy.execute(sql) - sql = """ - INSERT INTO {output_table} - SELECT __reachable_id__ as {id_column}, - cluster_id, - FALSE AS is_core_point, - NULL AS __points__ - FROM {reachable_points_table} AS __t1__ INNER JOIN - {output_table} AS __t2__ - ON (__src_list__[1] = {id_column}) + # Add features of points to the output table to use them for prediction + sql = """ + UPDATE {output_table} AS __t1__ + SET point = __t2__.point + FROM {source_view} AS __t2__ + WHERE __t1__.id = __t2__.id """.format(**locals()) - plpy.execute(sql) + plpy.execute(sql) - # Add features of points to the output table to use them for prediction - sql = """ - UPDATE {output_table} AS __t1__ - SET __points__ = {expr_point} - FROM {source_view} AS __t2__ - WHERE __t1__.{id_column} = __t2__.{id_column} - """.format(**locals()) - plpy.execute(sql) + plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}".format( + distance_table, core_points_table, core_edge_table, + reachable_points_table, snowflake_table, sf_edge_table)) - # Update the cluster ids to be consecutive + plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) + + # -- Update the cluster ids to be consecutive -- sql = """ UPDATE {output_table} AS __t1__ SET cluster_id = new_id-1 @@ -376,127 +278,658 @@ def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, """.format(**locals()) plpy.execute(sql) + # -- Generate output summary table --- + output_summary_table = add_postfix(output_table, '_summary') - # Drop the summary table from wcc plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) sql = """ CREATE TABLE {output_summary_table} AS - SELECT '{id_column}'::VARCHAR AS id_column, + SELECT $${id_column}$$::VARCHAR AS id_column, + $${expr_point}$$::VARCHAR AS expr_point, {eps}::DOUBLE PRECISION AS eps, + {min_samples}::BIGINT AS min_points, '{metric}'::VARCHAR AS metric """.format(**locals()) plpy.execute(sql) - plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}".format( - distance_table, core_points_table, core_edge_table, - reachable_points_table, snowflake_table, sf_edge_table)) - -def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps, - min_samples, metric, depth): - - n_features = num_features(source_table, expr_point) - - # If squared_dist_norm2 is used, we assume eps is set for the squared distance - # That means the border only needs to be sqrt(eps) wide - local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps - - kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree( - schema_madlib, source_table, expr_point, depth, n_features, local_eps) +class DBSCAN(object): + def __init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric): + """ + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Output table of points labelled by cluster_id + @param eps The eps value defined by the user + @param min_samples min_samples defined by user + @param metric metric selected by user + + """ + self.schema_madlib = schema_madlib + self.source_table = source_table + self.output_table = output_table + self.n_dims = num_features(source_table, 'point') + self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' + self.min_samples = min_samples + self.metric = metric + # If squared_dist_norm2 is used, we assume eps is set for the squared + # distance. That means the border width must be sqrt(eps) + self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps + +class DBSCAN_optimized(DBSCAN): + def __init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric, max_depth=None): + """ + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Output table of points labelled by cluster_id + @param eps The eps value defined by the user + @param min_samples min_samples defined by user + @param metric metric selected by user + """ + + DBSCAN.__init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric) + self.max_depth = max_depth + self.unique_str = unique_string('DBSCAN_opt') + self.num_segs = get_seg_number() + res = plpy.execute("SELECT COUNT(*) FROM {source_table}".format(**locals())) + self.num_points = res[0]['count'] + + def run(self): + unique_str = self.unique_str + distributed_by = self.distributed_by + schema_madlib = self.schema_madlib + + self.dist_map = 'dist_map' + unique_str + + segmented_source = 'segmented_source' + unique_str + self.segmented_source = segmented_source + self.build_optimized_tree() + + rowcount_table = 'rowcounts' + unique_str + leaf_clusters_table = 'leaf_clusters' + unique_str + + count_rows_sql = """ + DROP TABLE IF EXISTS {rowcount_table}; + CREATE TEMP TABLE {rowcount_table} AS + WITH i AS ( + SELECT + count(*) AS num_rows, + dist_id, + __dist_id__ + FROM internal_{segmented_source} + GROUP BY dist_id, __dist_id__ + ), e AS ( + SELECT + count(*) AS num_rows, + dist_id, + __dist_id__ + FROM external_{segmented_source} + GROUP BY dist_id, __dist_id__ + ) + SELECT + i.num_rows AS num_internal_rows, + COALESCE(e.num_rows,0) AS num_external_rows, + i.dist_id, + __dist_id__ + FROM i LEFT JOIN e + USING (dist_id, __dist_id__) + {distributed_by} + """.format(**locals()) + plpy.execute(count_rows_sql) + + # Run dbscan in parallel on each leaf + dbscan_leaves_sql = """ + CREATE TEMP TABLE {leaf_clusters_table} AS + WITH input AS ( + SELECT * + FROM internal_{segmented_source} + UNION ALL + SELECT * + FROM external_{segmented_source} + ), segmented_source AS ( + SELECT + ROW( + id, + point, + FALSE, + 0, + leaf_id, + dist_id + )::{schema_madlib}.__dbscan_record AS rec, + __dist_id__ + FROM input + ) + SELECT (output).*, __dist_id__ FROM ( + SELECT + {schema_madlib}.__dbscan_leaf( + rec, + {self.eps}, + {self.min_samples}, + '{self.metric}', + num_internal_rows, + num_external_rows + ) AS output, + __dist_id__ + FROM segmented_source JOIN {rowcount_table} + USING (__dist_id__) + ) a {distributed_by} + """.format(**locals()) + DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True) - kd_source_table = unique_string(desp='kd_source_table') - kd_border_table1 = unique_string(desp='kd_border_table1') - kd_border_table2 = unique_string(desp='kd_border_table2') + plpy.execute(""" + DROP TABLE IF EXISTS + internal_{segmented_source}, + external_{segmented_source} + """.format(**locals())) + + cluster_map = 'cluster_map' + unique_str + noise_point = label.NOISE_POINT + + # Replace local cluster_id's in each leaf with + # global cluster_id's which are unique across all leaves + gen_cluster_map_sql = """ + CREATE TEMP TABLE {cluster_map} AS + SELECT + dist_id, + __dist_id__, + cluster_id AS local_id, + ROW_NUMBER() OVER( + ORDER BY dist_id, cluster_id + ) AS global_id + FROM {leaf_clusters_table} + WHERE dist_id = leaf_id AND cluster_id != {noise_point} + GROUP BY __dist_id__, dist_id, cluster_id + {distributed_by} + """.format(**locals()) + plpy.execute(gen_cluster_map_sql) + + globalize_cluster_ids_sql = """ + UPDATE {leaf_clusters_table} lc + SET cluster_id = global_id + FROM {cluster_map} cm WHERE + cm.dist_id = lc.dist_id AND + cluster_id = local_id; + """.format(**locals()) + DEBUG.plpy.execute(globalize_cluster_ids_sql) + + intercluster_edges = 'intercluster_edges' + unique_str + dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)' + + # Build intercluster table of edges connecting clusters + # on different leaves + intercluster_edge_sql = """ + CREATE TEMP TABLE {intercluster_edges} AS + SELECT + internal.cluster_id AS src, + external.cluster_id AS dest + FROM {leaf_clusters_table} internal + JOIN {leaf_clusters_table} external + USING (id) + WHERE internal.is_core_point + AND internal.leaf_id = internal.dist_id + AND external.leaf_id != external.dist_id + GROUP BY src,dest + {dist_by_src} + """.format(**locals()) + DEBUG.plpy.execute(intercluster_edge_sql) + + res = plpy.execute("SELECT count(*) FROM {intercluster_edges}".format(**locals())) + + if int(res[0]['count']) > 0: + wcc_table = 'wcc' + unique_str + plpy.execute(""" + DROP TABLE IF EXISTS + {wcc_table}, {wcc_table}_summary, + {self.output_table} + """.format(**locals())) + + # Find connected components of intercluster_edge_table + wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges, + 'src=src,dest=dest', wcc_table, None) + + # Rename each cluster_id to its component id in the intercluster graph + merge_clusters_sql = """ + UPDATE {leaf_clusters_table} lc + SET cluster_id = wcc.component_id + FROM {wcc_table} wcc + WHERE lc.cluster_id = wcc.global_id + """.format(**locals()) + plpy.execute(merge_clusters_sql) - dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)' - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table, kd_border_table1, kd_border_table2)) + plpy.execute(""" + DROP TABLE IF EXISTS + {wcc_table}, {wcc_table}_summary + """.format(**locals())) - output_sql = """ - CREATE TABLE {kd_source_table} AS - SELECT *, - CASE {case_when_clause} END AS __leaf_id__ - FROM {source_table} - {dist_leaf_sql} + add_crossborder_points_sql = """ + CREATE TABLE {self.output_table} AS + SELECT + id, point, cluster_id, is_core_point + FROM ( + SELECT + id, + point, + is_core_point, + leaf_id = dist_id AS is_internal, + dist_id, + CASE WHEN cluster_id = {noise_point} + THEN MAX(cluster_id) OVER(PARTITION BY id) + ELSE cluster_id + END + FROM {leaf_clusters_table} + ) x WHERE is_internal AND cluster_id != {noise_point} """.format(**locals()) - plpy.execute(output_sql) + DEBUG.plpy.execute(add_crossborder_points_sql) - border_sql = """ - CREATE TABLE {kd_border_table1} AS - SELECT * - FROM {source_table} - WHERE {border_cl1} + plpy.execute(""" + DROP TABLE IF EXISTS {rowcount_table}, + {leaf_clusters_table}, {cluster_map}, + {intercluster_edges} + """.format(**locals())) + + def build_optimized_tree(self): + eps = self.eps + num_segs = self.num_segs + num_points = self.num_points + target = num_points / num_segs + source_table = self.source_table + leaf_bounds_table = 'leaf_bounds' + self.unique_str + max_depth = self.max_depth + n_dims = self.n_dims + dist_map = self.dist_map + distributed_by = self.distributed_by + internal_points = 'internal_' + self.segmented_source + external_points = 'external_' + self.segmented_source + + next_cut = 'next_cut' + self.unique_str + prev_node = 'prev_node' + self.unique_str + + first_cut_sql = """ + DROP TABLE IF EXISTS {prev_node}; + CREATE TEMP TABLE {prev_node} AS + WITH world_bounds AS ( + SELECT + i, + min(point[i]) AS lower, + max(point[i])+{eps} AS upper, + max(point[i])+{eps} - min(point[i]) AS size, + floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT AS eps_bins + FROM generate_series(1,{n_dims}) AS i, {source_table} + GROUP BY i + ), + density_map AS ( + SELECT i, eps_bin, + eps_bins, + COALESCE(density, 0) AS density + FROM ( + SELECT i, eps_bins, + generate_series(0, eps_bins - 1) AS eps_bin + FROM world_bounds + ) g LEFT JOIN ( + SELECT i, + floor((point[i] - lower)/{eps})::INT AS eps_bin, + eps_bins, + COUNT(*) AS density + FROM + world_bounds, + {source_table} + GROUP BY i, eps_bin, eps_bins + ) a + USING (i, eps_bin, eps_bins) + ), + loss_table AS ( + SELECT i, eps_bin, + left_internal, + {num_points} as points_in_node, + left_external, + right_external, + {self.schema_madlib}.__dbscan_segmentation_loss( + left_internal, + {num_points} - left_internal, + left_external, + right_external, + {num_segs}::BIGINT, + V, + {eps}::DOUBLE PRECISION, + {n_dims}::BIGINT, + eps_bin::DOUBLE PRECISION, + eps_bins::DOUBLE PRECISION + ) AS losses + FROM ( + SELECT i, eps_bin, eps_bins, + {num_segs}::BIGINT AS num_segs, + COALESCE(density, 0) AS right_external, + COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY eps_bin), 0) AS left_external, + SUM(density) OVER(PARTITION BY i ORDER BY eps_bin)::BIGINT AS left_internal + FROM ( + SELECT i, generate_series(0, eps_bins - 1) AS eps_bin FROM world_bounds + ) g LEFT JOIN density_map USING(i, eps_bin) + ) params, + ( + SELECT {self.schema_madlib}.prod(size) AS V + FROM world_bounds + ) AS volume + ), + optimize AS ( + SELECT i, lower, cutoff, upper, + left_avail, right_avail, + left_internal, right_internal + FROM ( + SELECT + i, + CASE WHEN (losses).right_avail = 0 + THEN + points_in_node + ELSE + left_internal + END AS left_internal, + CASE WHEN (losses).right_avail = 0 + THEN + 0::BIGINT + ELSE + points_in_node - left_internal + END AS right_internal, + (losses).left_avail AS left_avail, + (losses).right_avail AS right_avail, + GREATEST((losses).left_loss,(losses).right_loss) AS total_loss, + min( + GREATEST((losses).left_loss,(losses).right_loss) + ) OVER() AS total_min, + wb.lower, + CASE WHEN (losses).right_avail = 0 + THEN + wb.upper + ELSE + wb.lower + {eps}*(eps_bin+1) + END AS cutoff, + wb.upper + FROM loss_table JOIN world_bounds wb USING(i) + ) a WHERE total_loss = total_min LIMIT 1 + ) + SELECT s.id, + ARRAY[i] AS coords, + CASE WHEN s.point[i] < cutoff + THEN left_avail + ELSE right_avail + END AS avail_segs, + CASE WHEN point[i] < cutoff + THEN left_internal + ELSE right_internal + END AS points_in_node, + CASE WHEN s.point[i] < cutoff + THEN ARRAY[lower] + ELSE ARRAY[cutoff] + END AS lower_bounds, + CASE WHEN s.point[i] < cutoff + THEN ARRAY[cutoff] + ELSE ARRAY[upper] + END AS upper_bounds, + (s.point[i] >= cutoff)::INT AS node_id + FROM optimize, {source_table} s """.format(**locals()) - plpy.execute(border_sql) - - border_sql = """ - CREATE TABLE {kd_border_table2} AS - SELECT * + plpy.execute(first_cut_sql) + + segment_source_sql = """ + CREATE TEMP TABLE {next_cut} AS + WITH node_bounds AS ( + SELECT + i, + node_id, + avail_segs, + points_in_node, + min(point[i]) AS lower, + max(point[i])+{eps} AS upper, + max(point[i])+{eps} - min(point[i]) AS size, + floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT AS eps_bins + FROM generate_series(1, {n_dims}) AS i, + {prev_node} JOIN {source_table} USING (id) + GROUP BY 1,2,3,4 + ), density_map AS ( + SELECT i, node_id, eps_bin, + COALESCE( density, 0) AS density + FROM ( + SELECT node_id, i, + generate_series(0, eps_bins - 1) AS eps_bin + FROM node_bounds + ) g LEFT JOIN ( + SELECT node_id, i, + floor((point[i] - lower)/{eps})::INT AS eps_bin, + COUNT(*) AS density + FROM + node_bounds JOIN {prev_node} USING (node_id) + JOIN {source_table} USING (id) + GROUP BY node_id, i, eps_bin + ) a + USING(i, node_id, eps_bin) + ), params AS ( + SELECT i, node_id, eps_bin, eps_bins, + COALESCE(density, 0) AS right_external, + COALESCE(LEAD(density) OVER(PARTITION BY node_id,i ORDER BY eps_bin), 0) AS left_external, + SUM(density) OVER(PARTITION BY node_id,i ORDER BY eps_bin)::BIGINT AS left_internal + FROM ( + SELECT i, node_id, eps_bins, generate_series(0, eps_bins - 1) AS eps_bin FROM node_bounds + ) g LEFT JOIN density_map USING(i, node_id, eps_bin) + ), volume AS ( + SELECT + node_id, + avail_segs, + points_in_node, + {self.schema_madlib}.prod(size) AS V + FROM node_bounds + WHERE i=1 + GROUP BY 1,2,3 + ), loss_table AS ( + SELECT i, node_id, eps_bin, + left_internal, + points_in_node - left_internal AS right_internal, + left_external, + right_external, + {self.schema_madlib}.__dbscan_segmentation_loss( + left_internal, + points_in_node, + left_external, + right_external, + avail_segs, + V, + {eps}::DOUBLE PRECISION, + {n_dims}::BIGINT, + eps_bin::DOUBLE PRECISION, + eps_bins::DOUBLE PRECISION + ) AS losses + FROM params JOIN volume USING (node_id) + ), optimize AS ( + SELECT * FROM ( + SELECT + node_id, + i, + (losses).left_avail, + (losses).right_avail, + CASE WHEN (losses).right_avail = 0 + THEN + points_in_node + ELSE + left_internal + END AS left_internal, + CASE WHEN (losses).right_avail = 0 + THEN + 0::BIGINT + ELSE + points_in_node - left_internal + END AS right_internal, + node_bounds.lower, + CASE WHEN (losses).right_avail = 0 + THEN + node_bounds.upper + ELSE + node_bounds.lower + {eps}*(eps_bin+1) + END AS cutoff, + node_bounds.upper, + ROW_NUMBER() OVER( + PARTITION BY node_id + ORDER BY GREATEST((losses).left_loss,(losses).right_loss) + ) AS loss_rank + FROM loss_table JOIN node_bounds USING(i, node_id) + ) a WHERE loss_rank = 1 + ) + SELECT id, coords || i AS coords, + CASE WHEN point[i] < opt.cutoff + THEN left_avail + ELSE right_avail + END AS avail_segs, + CASE WHEN point[i] < opt.cutoff + THEN left_internal + ELSE right_internal + END AS points_in_node, + lower_bounds || + CASE WHEN point[i] < opt.cutoff + THEN + lower + ELSE + opt.cutoff + END AS lower_bounds, + opt.cutoff, + upper_bounds || + CASE WHEN point[i] < opt.cutoff + THEN + opt.cutoff + ELSE + upper + END AS upper_bounds, + (node_id << 1) | (point[i] >= opt.cutoff)::INT AS node_id FROM {source_table} - WHERE {border_cl2} + JOIN {prev_node} + USING (id) + JOIN optimize opt + USING (node_id) """.format(**locals()) - plpy.execute(border_sql) - return kd_source_table, kd_border_table1, kd_border_table2 + for i in range(max_depth): + DEBUG.plpy.info(segment_source_sql) + plpy.execute(segment_source_sql) + plpy.execute(""" + DROP TABLE IF EXISTS {prev_node}; + ALTER TABLE {next_cut} RENAME TO {prev_node} + """.format(**locals())) + if is_platform_pg(): + # For postgres, there should only be 1 node, + # so we assign it dist_id = __dist_id__ = 0 + plpy.execute(""" + CREATE TABLE {dist_map} AS + SELECT 0::BIGINT AS dist_id, 0::BIGINT as __dist_id__ + """.format(**locals())) + else: + # Create implicit many-to-1 map from __dist_id__ to seg_id + temp_dist_table = 'dist_table' + self.unique_str + create_dist_table_sql = """ + CREATE TEMP TABLE {temp_dist_table} AS + SELECT generate_series(1,10000) AS __dist_id__ + {distributed_by} + """.format(**locals()) + plpy.execute(create_dist_table_sql) + + # Create a 1-1 map from dist_id to __dist_id__, + # where dist_id's are re-ordered so they are evenly + # distributed among segments. This should work even + # if there are gaps in the dist_id assignments (some + # dist_id's missing) + leaves_sql = """ + SELECT + node_id AS dist_id, + ROW_NUMBER() OVER() AS leaf_num + FROM {prev_node} + GROUP BY node_id + """.format(**locals()) + leaves_cnt = plpy.execute(""" + WITH leaves as ({leaves_sql}) + SELECT COUNT(*) cnt from leaves + """.format(**locals()))[0]['cnt'] + create_dist_map_sql = """ + CREATE TEMP TABLE {dist_map} AS + WITH leaves as ( + {leaves_sql} + ), segmap AS ( + SELECT + gp_segment_id AS seg_id, + ROW_NUMBER() OVER( + ORDER BY ctid, gp_segment_id + ) AS leaf_num, + __dist_id__ + FROM {temp_dist_table} + LIMIT {leaves_cnt} + ) + SELECT dist_id, __dist_id__ + FROM leaves JOIN segmap + USING (leaf_num) + {distributed_by} + """.format(**locals()) + plpy.execute(create_dist_map_sql) -def build_kd_tree(schema_madlib, source_table, expr_point, - depth, n_features, eps, **kwargs): - """ - KD-tree function to create a partitioning - Args: - @param schema_madlib Name of the Madlib Schema - @param source_table Training data table - @param output_table Name of the table to store kd tree - @param expr_point Name of the column with training data - or expression that evaluates to a - numeric array - @param depth Depth of the kd tree - @param n_features Number of features - @param eps The eps value defined by the user - """ - with MinWarning("error"): - - clauses = [' TRUE '] - border_cl1 = ' FALSE ' - border_cl2 = ' FALSE ' - clause_counter = 0 - kd_array = [] - for curr_level in range(depth): - curr_feature = (curr_level % n_features) + 1 - for curr_leaf in range(pow(2,curr_level)): - clause = clauses[clause_counter] - cutoff_sql = """ - SELECT percentile_disc(0.5) - WITHIN GROUP ( - ORDER BY ({expr_point})[{curr_feature}] - ) AS cutoff - FROM {source_table} - WHERE {clause} - """.format(**locals()) + plpy.execute(""" + DROP TABLE IF EXISTS {temp_dist_table} + """.format(**locals())) - cutoff = plpy.execute(cutoff_sql)[0]['cutoff'] - cutoff = "NULL" if cutoff is None else cutoff - kd_array.append(cutoff) - clause_counter += 1 - clauses.append(clause + - "AND ({expr_point})[{curr_feature}] < {cutoff} ". - format(**locals())) - clauses.append(clause + - "AND ({expr_point})[{curr_feature}] >= {cutoff} ". - format(**locals())) - border_cl1 = border_cl1 + """ OR (({expr_point})[{curr_feature}] >= {cutoff} - {eps} - AND ({expr_point})[{curr_feature}] <= {cutoff} + {eps}) - """.format(**locals()) - border_cl2 = border_cl2 + """ OR (({expr_point})[{curr_feature}] >= {cutoff} - (2*{eps}) - AND ({expr_point})[{curr_feature}] <= {cutoff} + (2*{eps})) - """.format(**locals()) - - n_leaves = pow(2, depth) - case_when_clause = '\n'.join(["WHEN {0} THEN {1}::INTEGER".format(cond, i) - for i, cond in enumerate(clauses[-n_leaves:])]) - return kd_array, case_when_clause, border_cl1, border_cl2 + gen_internal_points_sql = """ + CREATE TEMP TABLE {internal_points} AS + SELECT id, point, node_id AS leaf_id, node_id AS dist_id, __dist_id__ + FROM {source_table} + JOIN {prev_node} + USING (id) + JOIN {dist_map} + ON node_id = dist_id + {distributed_by} + """.format(**locals()) + DEBUG.plpy.execute(gen_internal_points_sql) + plpy.execute(""" + DROP TABLE IF EXISTS {prev_node} + """.format(**locals())) + + generate_leaf_bounds_sql = """ + CREATE TEMP TABLE {leaf_bounds_table} AS + SELECT + array_agg(min ORDER BY i) AS leaf_lower, + array_agg(max ORDER BY i) AS leaf_upper, + array_agg(min - {eps} ORDER BY i) AS dist_lower, + array_agg(max + {eps} ORDER BY i) AS dist_upper, + leaf_id, + __dist_id__ + FROM ( + SELECT + leaf_id, + i, + min(point[i]), + max(point[i]), + count(*) + FROM {internal_points}, generate_series(1, {n_dims}) AS i + GROUP BY leaf_id, i + ) x + JOIN {dist_map} + ON leaf_id = dist_id + GROUP BY leaf_id, __dist_id__ + {distributed_by} + """.format(**locals()) + DEBUG.plpy.execute(generate_leaf_bounds_sql) + + gen_external_points_sql = """ + CREATE TEMP TABLE {external_points} AS + SELECT i.id, i.point, i.leaf_id, b.leaf_id AS dist_id, b.__dist_id__ + FROM {leaf_bounds_table} b JOIN {internal_points} i + ON b.leaf_id != i.leaf_id + WHERE 0 <= all({self.schema_madlib}.array_sub(point, dist_lower)) AND + 0 < all({self.schema_madlib}.array_sub(dist_upper, point)) + {distributed_by} + """.format(**locals()) + plpy.execute(gen_external_points_sql) + + plpy.execute(""" + DROP TABLE IF EXISTS + {leaf_bounds_table}, {dist_map} + """.format(**locals())) def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, expr_point, output_table, **kwargs): @@ -504,108 +937,472 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, with MinWarning("warning"): _validate_dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, - expr_point, output_table) + expr_point, output_table) dbscan_summary_table = add_postfix(dbscan_table, '_summary') summary = plpy.execute("SELECT * FROM {0}".format(dbscan_summary_table))[0] eps = summary['eps'] metric = summary['metric'] - db_id_column = summary['id_column'] sql = """ CREATE TABLE {output_table} AS - SELECT __q1__.{id_column}, cluster_id, distance + WITH src AS ( + SELECT ({id_column}) AS id, ({expr_point}) AS point FROM {source_table} + ) + SELECT __q1__.id, cluster_id, distance FROM ( - SELECT __t2__.{id_column}, cluster_id, - min({schema_madlib}.{metric}(__t1__.__points__, - __t2__.{expr_point})) as distance - FROM {dbscan_table} AS __t1__, {source_table} AS __t2__ + SELECT __t2__.id, cluster_id, + min({schema_madlib}.{metric}(__t1__.point, + __t2__.point)) as distance + FROM {dbscan_table} AS __t1__, src AS __t2__ WHERE is_core_point = TRUE - GROUP BY __t2__.{id_column}, cluster_id - ) __q1__ - WHERE distance <= {eps} + GROUP BY __t2__.id, cluster_id + ) __q1__ WHERE distance <= {eps} """.format(**locals()) result = plpy.execute(sql) -def find_core_points_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs): - - SD = kwargs['SD'] - if not state: - data = {} - SD['counter{0}'.format(leaf_id)] = 0 +class label: + UNLABELLED = 0 + NOISE_POINT = -1 + +pstats = None +class dbscan_perfstats: + def __init__(self): + self.range_query_calls = 0 + self.range_query_time = 0.0 + self.range_query_candidates = 0 + self.range_query_neighbors = 0 + + def show(self, msg): + DEBUG.plpy.info("{}_stats: range_query total calls = {}" + .format(msg, self.range_query_calls)) + + range_query_calls = self.range_query_calls if self.range_query_calls else 0.00001 + + DEBUG.plpy.info("{}_stats: range_query total time = {}s" + .format(msg, self.range_query_time)) + DEBUG.plpy.info("{}_stats: range_query total candidates = {}" + .format(msg, self.range_query_candidates)) + DEBUG.plpy.info("{}_stats: range_query total neighbors returned = {}" + .format(msg, self.range_query_neighbors)) + DEBUG.plpy.info("{}_stats: range_query avg time = {}s" + .format(msg, self.range_query_time / range_query_calls)) + DEBUG.plpy.info("{}_stats: range_query avg candidates returned = {}" + .format(msg, self.range_query_candidates * 1.0 / range_query_calls)) + DEBUG.plpy.info("{}_stats: range_query avg neighbors returned = {}" + .format(msg, self.range_query_neighbors * 1.0 / range_query_calls)) + self.__init__() # Clear counters and start over + +def within_range_tf(point, candidate_ids, candidate_points, + metric_cutoff, norm=2): + sd = tf.squared_difference(tfcandidates, tfpoint) + distances = tf.math.reduce_sum(sd, 1) + output = tf.where(distances <= 2) + res = output.eval(session=sess) + res.shape = res.shape[0] + return res + +def within_range_np_linalg(point, candidate_ids, candidate_points, + eps, norm=2): + distances = np.linalg.norm(candidate_points - point, ord=norm, axis=1) + return candidate_ids[distances <= eps] + +def within_range_np(point, candidate_ids, candidate_points, + metric_cutoff, norm=2): + if norm==1: + distances = np.sum(candidate_points, axis=1) else: - data = SD['data{0}'.format(leaf_id)] + distances = np.sum(np.square(candidate_points - point), axis=1) + return candidate_ids[distances <= metric_cutoff] - data[id_in] = expr_points - SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1 - SD['data{0}'.format(leaf_id)] = data - ret = [[-1,-1],[-1,-1]] +class dbscan_record(object): + def __init__(self, obj, eps=None): + if eps is not None: + self._init_from_dict(obj, eps) + else: + self._clone(obj) - my_n_rows = n_rows[leaf_id] + @staticmethod + def from_dict(d, eps): + if d['leaf_id'] == d['dist_id']: + return dbscan_internal_record(d, eps) + else: + return dbscan_external_record(d, eps) + + def _init_from_dict(self, d, eps): + """ + dbscan_record dict constructor + The only reason for passing eps here is so we + can pre-compute the point's neighborhood for later + """ + self.id = d['id'] + self.point = d['point'] + self.is_core_point = False + self.cluster_id = label.UNLABELLED + self.leaf_id = d['leaf_id'] + self.dist_id = d['dist_id'] + + # private members (for convenience, not for output) + p = np.array(self.point) + self._np_point = p + self._bounds = np.concatenate([p, p]) + self._neighborhood = np.concatenate([p - eps, p + eps]) + + def _clone(self, rec): + """ + dbscan_record copy constructor - we call this + just before outputing any row. For external + points, we have to clone it each time so that + we can output multiple rows. For internal points, + it's just a quick way of cleaning it up (removing + the extra private vars). + """ + # Copy only public members + self.id = rec.id + self.point = rec.point + self.is_core_point = rec.is_core_point + self.cluster_id = rec.cluster_id + self.leaf_id = rec.leaf_id + self.dist_id = rec.dist_id + + def __repr__(self): + fmt = 'id={},point[{}],cluster_id={},core={},leaf_id={},dist_id={}' + rep = fmt.format(self.id, len(self.point), self.cluster_id, + self.is_core_point, self.leaf_id, self.dist_id) + return 'dbscan_record({})'.format(rep) + + def __eq__(self, other): + if not isinstance(other, dbscan_record): + return False + return self.__dict__ == other.__dict__ + +class dbscan_internal_record(dbscan_record): + def __init__(self, obj, eps=None): + dbscan_record.__init__(self, obj, eps) + + def _init_from_dict(self, d, eps): + """ + private members specific to internal records + """ + dbscan_record._init_from_dict(self, d, eps) + self._external_neighbors = [] + self._internal_neighbor_count = 0 + self._possible_border_points = [] + + def not_core_point(self, min_samples): + """ + Computes self.is_core_point from internal and + external neighbor counts, and returns its negation + """ + ext_neighbors = len(self._external_neighbors) + int_neighbors = self._internal_neighbor_count + num_neighbors = ext_neighbors + int_neighbors + self.is_core_point = (num_neighbors + 1 >= min_samples) + return not self.is_core_point + +class dbscan_external_record(dbscan_record): + def __init__(self, obj, eps=None): + dbscan_record.__init__(self, obj, eps) + + def _init_from_dict(self, d, eps): + """ + private members specific to external records + """ + dbscan_record._init_from_dict(self, d, eps) + self._nearby_clusters = set() + +class LeafStorage: + """ + LeafStorage - if SD['counter{0}'.format(leaf_id)] == my_n_rows: + This is a per-leaf storage class for dbscan_leaf(). + There are 3 containers within LeafStorage: - core_counts = {} - core_lists = {} - p = index.Property() - p.dimension = len(expr_points) - idx = index.Index(properties=p) - ret = [] + rtree - a spatial index of the id's of the leaf's internal points + records - a hash table for looking up dbscan_internal_records by id + external_records - an ordered list of dbscan_external_records - if metric == 'dist_norm1': - fn_dist = distance.cityblock - elif metric == 'dist_norm2': - fn_dist = distance.euclidean + Methods: + + add_point() - adds a point to LeafStorage + range_query() - returns a list of internal point id's within eps of a point + update_neighbors() - updates internal neighbor counts based on a point and + list of neighboring internal points + label_point() - assigns a point to a specific cluster, or marks it as noise + + yield_external_records() - clones and yields a copy of each dbscan_external_record + based on its list of nearby_clusters + """ + def __init__(self, dimension, dtype, eps, metric, min_samples): + self.n_dims = dimension + self.dtype = dtype + self.eps = eps + self.metric = metric + self.min_samples = min_samples + props = index.Property(dimension=dimension) + self.props = props + self.rtree = index.Index(properties=props) + self.records = {} + self.external_records = [] + + def add_point(self, rec): + if rec.leaf_id == rec.dist_id: + self.rtree.add(rec.id, rec._bounds) + self.records[rec.id] = rec else: - fn_dist = distance.sqeuclidean + self.external_records.append(rec) + + def range_query(self, db_rec): + """ + Returns a list of records of all neighboring + points within eps distance from point. Returns None if the + number of results will be < min_results. Also returns None + if number of results > max_results > 0 The special value + max_results = 0 means unlimited results (return all). + """ + global pstats + pstats.range_query_calls += 1 + start_time = time() + + eps = self.eps + metric = self.metric + records = self.records + rtree = self.rtree + + if not metric in ('squared_dist_norm2', 'dist_norm2', 'dist_norm1'): + plpy.error("Sorry, optimized method does not support {} metric yet".format(metric)) + + norm = 1 if metric == 'dist_norm1' else 2 + metric_cutoff = eps if metric == 'dist_norm1' else eps*eps + + if isinstance(db_rec, dbscan_internal_record): + # As long as we update the neighbor counts of + # of a core point's neighbors before returning, + # we can safely remove it from the rtree for + # future searches. If this is a non-core point, + # we still delete it, but add its id to the list of + # _possible_border_points in the db_rec of each + # of its unlabelled neighbors + self.rtree.delete(db_rec.id, db_rec._bounds) + + DEBUG.start_timing('rtree_count') + n = rtree.count(db_rec._neighborhood) + DEBUG.print_timing('rtree_count') + + DEBUG.start_timing('rtree_intersection') + ids = rtree.intersection(db_rec._neighborhood, objects=False) + DEBUG.print_timing('rtree_intersection') + + candidate_ids = np.empty(n, dtype='int64') + candidate_points = np.empty([n, self.n_dims], dtype=self.dtype) + i = 0 + for id in ids: + candidate_ids[i] = id + candidate_points[i] = records[id]._np_point + i += 1 + + # Avoid catastrophic issues which are extremeley difficult to debug + assert i == n + + pstats.range_query_candidates += n + + DEBUG.start_timing('within_range_np_linalg') + neighbors = within_range_np_linalg( + db_rec._np_point, + candidate_ids, + candidate_points, + eps, + norm + ) + DEBUG.print_timing('within_range_np_linalg') + + pstats.range_query_neighbors += len(neighbors) + end_time = time() + pstats.range_query_time += (end_time - start_time) + + if isinstance(db_rec, dbscan_internal_record): + # Necessary because we are deleting it from the rtree + # (won't show up in future searches) + self.update_neighbors(db_rec, neighbors) + neighbors = np.concatenate([neighbors, np.array(db_rec._possible_border_points, dtype='int64')]) + elif isinstance(db_rec, dbscan_external_record): + for id in neighbors: + records[id]._external_neighbors.append(db_rec) + else: + raise Exception("range_query() received unexpected type {}".format(type(db_rec))) - for key1, value1 in data.items(): - idx.add(key1,value1+value1,key1) + return neighbors - for key1, value1 in data.items(): + def update_neighbors(self, db_rec, neighbors): + db_rec._internal_neighbor_count += len(neighbors) - v1 = [] - v2 = [] - for coor in value1: - v1.append(coor-eps) - v2.append(coor+eps) + not_core_point = db_rec.not_core_point(self.min_samples) - # Array concat - v = v1+v2 - hits = idx.intersection(v) + for n_id in neighbors: + n_rec = self.records[n_id] + n_rec._internal_neighbor_count += 1 + if not_core_point and n_rec.cluster_id == label.UNLABELLED: + n_rec._possible_border_points.append(db_rec.id) - if key1 not in core_counts: - core_counts[key1] = 0 - core_lists[key1] = [] - for key2 in hits: - value2 = data[key2] - dist = fn_dist(value1, value2) - if dist <= eps: - core_counts[key1] += 1 - core_lists[key1].append(key2) + def label_point(self, rec, cluster): + """ + Labels a point as a member of a particular cluster, or if + cluster = None as a NOISE_POINT. + """ + rec.cluster_id = cluster - for key1, value1 in core_counts.items(): - if value1 >= min_samples: - for key2 in core_lists[key1]: - ret.append([key1,key2]) + if isinstance(rec, dbscan_internal_record): + if cluster != label.NOISE_POINT: + # Mark nearby external points as near this cluster + for ext_neighbor in rec._external_neighbors: + ext_neighbor._nearby_clusters.add(cluster) - return ret + def yield_external_records(self): + external_records = self.external_records -def find_core_points_merge(state1, state2, **kwargs): + for rec in external_records: + for cluster_id in rec._nearby_clusters: + rec_clone = dbscan_external_record(rec) + self.label_point(rec_clone, cluster_id) + yield rec_clone - if not state1: - return state2 - elif not state2: - return state1 +DEBUG_WITH_TRACEBACKS = False + +# Just for debugging... so we receive the traceback +def dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points, + num_external_points, SD, **kwargs): + res = _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points, + num_external_points, SD) + + if DEBUG_WITH_TRACEBACKS: + ret = [] + for i, db_rec in enumerate(res): + ret.append(db_rec) # Returning a generator breaks the traceback forwarding + return ret else: - plpy.error("dbscan Error: A kd-tree leaf should be on a single segment.") + return res +def _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points, + num_external_points, SD): + """ + Performs classic dbscan algorithm in parallel on each leaf + """ + global pstats -def find_core_points_final(state, **kwargs): + num_points = num_internal_points + num_external_points - return state + db_rec = dbscan_record.from_dict(db_rec, eps) + id = db_rec.id + dist_id = db_rec.dist_id + if dist_id in SD: + leaf = SD[dist_id] + else: + pstats = dbscan_perfstats() + DEBUG.start_timing("dbscan_leaf") + DEBUG.start_timing("dbscan_leaf_load_rtree") + leaf = LeafStorage( len(db_rec.point), + db_rec._np_point.dtype, + eps, + metric, + min_samples + ) + SD[dist_id] = leaf + + records = leaf.records + external_records = leaf.external_records + + leaf.add_point(db_rec) + + if len(records) == num_internal_points and \ + len(external_records) == num_external_points: + DEBUG.print_timing("dbscan_leaf_load_rtree") + DEBUG.start_timing("dbscan_leaf_external") + + # Find all internal neighbors of external points + for ext_rec in external_records: + DEBUG.start_timing("external_rangequery") + # We don't need to save the result, we only care about + # the side effect of adding this point to the list of + # neighbor._ext_neighbors for each internal neighbor found + _ = leaf.range_query(ext_rec) + DEBUG.print_timing("external_rangequery") + + DEBUG.print_timing("dbscan_leaf_external") + pstats.show("external") + pstats = dbscan_perfstats() + + # Classify internal points into clusters (main dbscan algorithm), + # and yield resulting internal records + DEBUG.start_timing("dbscan_leaf_internal") + cluster = 0 + for rec in records.values(): + if rec.cluster_id != label.UNLABELLED: + continue + + DEBUG.start_timing('internal_rangequery') + neighbors = leaf.range_query(rec) + DEBUG.print_timing('internal_rangequery') + + if not rec.is_core_point: # rec.is_core_point field will be set inside range_query(rec) + leaf.label_point(rec, label.NOISE_POINT) + continue + + cluster += 1 + + leaf.label_point(rec, cluster) + yield dbscan_record(rec) + + queue = deque([neighbors]) + DEBUG.start_timing('internal_per_cluster') + while len(queue) > 0: + for next_id in queue.popleft(): + rec2 = records[next_id] + if rec2.cluster_id != label.NOISE_POINT and \ + rec2.cluster_id != label.UNLABELLED: + # We delete all points from the unlabelled + # rtree as soon as we assign them to a cluster. + # But we will still get here if the same point + # gets returned by more than one range query + # and added to the queue before the first reference + # to it gets popped and processed. The point will + # be labelled as a part of this cluster the first time + # it gets popped. We can ignore any duplicate references + # to it that get popped after that. + continue + + DEBUG.start_timing('internal_rangequery') + neighbors = leaf.range_query(rec2) + DEBUG.print_timing('internal_rangequery') + + if not rec2.is_core_point: # rec2.is_core_point field will be set inside range_query(rec2) + # Label as border point + leaf.label_point(rec2, cluster) + yield dbscan_record(rec2) + continue + + queue.append(neighbors) + # Label as core point + leaf.label_point(rec2, cluster) + yield dbscan_record(rec2) + + DEBUG.print_timing('internal_per_cluster') + + DEBUG.print_timing("dbscan_leaf_internal") + pstats.show("internal") + + # Return rows for external records (one for each + # cluster an external point is close to) + DEBUG.start_timing("yield_external_records") + for ext_rec in leaf.yield_external_records(): + yield ext_rec + DEBUG.print_timing("yield_external_records") + + # Clean up for next time, if there is one + del SD[dist_id] + + DEBUG.print_timing("dbscan_leaf") # The snowflake table is used to reduce the size of the edge table. # Note that the sole purpose of the edge table is finding the connected @@ -709,14 +1506,21 @@ def sf_final(state, **kwargs): return state def _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm, depth): + expr_point, eps, min_samples, metric, algorithm, max_depth): input_tbl_valid(source_table, 'dbscan') output_tbl_valid(output_table, 'dbscan') output_summary_table = add_postfix(output_table, '_summary') output_tbl_valid(output_summary_table, 'dbscan') - cols_in_tbl_valid(source_table, [id_column], 'dbscan') + _assert(is_var_valid(source_table, id_column), + "dbscan error: {0} is an invalid column or " + "expression for id param".format(id_column)) + + id_col_type = get_expr_type(id_column, source_table) + _assert(is_valid_psql_type(id_col_type, INTEGER), + "dbscan Error: unique id column or expression '{0}' in input table must" + " evaluate to an integer type.".format(id_column)) _assert(is_var_valid(source_table, expr_point), "dbscan error: {0} is an invalid column name or " @@ -724,19 +1528,22 @@ def _validate_dbscan(schema_madlib, source_table, output_table, id_column, point_col_type = get_expr_type(expr_point, source_table) _assert(is_valid_psql_type(point_col_type, NUMERIC | ONLY_ARRAY), - "dbscan Error: Feature column or expression '{0}' in train table is not" + "dbscan Error: Feature column or expression '{0}' in input table is not" " a numeric array.".format(expr_point)) _assert(eps > 0, "dbscan Error: eps has to be a positive number") _assert(min_samples > 0, "dbscan Error: min_samples has to be a positive number") - fn_dist_list = ['dist_norm1', 'dist_norm2', 'squared_dist_norm2', 'dist_angle', 'dist_tanimoto'] - _assert(metric in fn_dist_list, "dbscan Error: metric has to be one of the madlib defined distance functions") + fn_dist_list = ['dist_norm1', 'dist_norm2', 'squared_dist_norm2', + 'dist_angle', 'dist_tanimoto'] + _assert(metric in fn_dist_list, + "dbscan Error: metric has to be one of the madlib defined distance functions") - _assert(algorithm == BRUTE_FORCE or RTREE_ENABLED == 1, - "dbscan Error: Cannot use kd_tree without the necessary python module: rtree") - _assert(depth > 0, "dbscan Error: depth has to be a positive number") + _assert(algorithm == METHOD_BRUTE_FORCE or RTREE_ENABLED == 1, + "dbscan Error: Cannot use rtree without the necessary python module: rtree") + _assert(max_depth is None or max_depth >= 0, + "dbscan Error: max_segmentation_depth must be a non-negative integer or NULL") def _validate_dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, expr_point, output_table): @@ -747,7 +1554,14 @@ def _validate_dbscan_predict(schema_madlib, dbscan_table, source_table, input_tbl_valid(dbscan_summary_table, 'dbscan') output_tbl_valid(output_table, 'dbscan') - cols_in_tbl_valid(source_table, [id_column], 'dbscan') + _assert(is_var_valid(source_table, id_column), + "dbscan error: {0} is an invalid column name or " + "expression for id param".format(id_column)) + + id_col_type = get_expr_type(id_column, source_table) + _assert(is_valid_psql_type(id_col_type, INTEGER), + "dbscan Error: unique id column or expression '{0}' in input table must" + " evaluate to an integer type.".format(id_column)) _assert(is_var_valid(source_table, expr_point), "dbscan error: {0} is an invalid column name or " @@ -779,14 +1593,24 @@ def dbscan_help(schema_madlib, message=None, **kwargs): SELECT {schema_madlib}.dbscan( source_table, -- Name of the training data table output_table, -- Name of the output table - id_column, -- Name of id column in source_table + id_column, -- A column or expression of columns specifying a unique id for rows in the source_table expr_point, -- Column name or expression for data points eps, -- The minimum radius of a cluster min_samples, -- The minimum size of a cluster metric, -- The name of the function to use to calculate the -- distance - algorithm, -- The algorithm to use for dbscan: brute or kd_tree - depth -- The depth for the kdtree algorithm + algorithm, -- The algorithm to use for dbscan: brute_force or optimized (default=optimized) + max_segmentation_depth -- This has no effect on method=brute. For method=rtree, this + limits the number of splits to be considered while dividing + the dataset into smaller spacial regions for optimal parallel performance. + The default ( NULL or ommitted ) value means it will be unlimited, + but note that it will never break the data up into more regions than + segments in a greenplum cluster, and in some cases may choose fewer. + Setting this to non-NULL will decrease up-front overhead, and can + be useful if you expect to find only a small number of densely populated + clusters, or if the dataset itself is small enough where initial planning + overhead matters more than scaling performance. This parameter will + not affect the output, only the performance. ); ----------------------------------------------------------------------- @@ -794,10 +1618,10 @@ SELECT {schema_madlib}.dbscan( ----------------------------------------------------------------------- The output of the dbscan function is a table with the following columns: -id_column The ids of test data point -cluster_id The id of the points associated cluster +id The id of each point which has been assigned to a cluster +cluster_id The cluster_id's assigned to each point is_core_point Boolean column that indicates if the point is core or not -points The column or expression for the data point +point The coordinates of each classified point """ else: help_string = """ diff --git a/src/ports/postgres/modules/dbscan/dbscan.sql_in b/src/ports/postgres/modules/dbscan/dbscan.sql_in index 7e1df05..41a96a2 100644 --- a/src/ports/postgres/modules/dbscan/dbscan.sql_in +++ b/src/ports/postgres/modules/dbscan/dbscan.sql_in @@ -84,11 +84,11 @@ dbscan( source_table, <dd>TEXT. Name of the table containing the clustering results. </dd> -<dt>id_column</dt> -<dd>TEXT. Name of the column containing a unique integer id for each training point. +<dt>id</dt> +<dd>TEXT. Name of a column or expression containing a unique integer id for each training point. </dd> -<dt>expr_point</dt> +<dt>point</dt> <dd>TEXT. Name of the column with point coordinates in array form, or an expression that evaluates to an array of point coordinates. </dd> @@ -131,13 +131,36 @@ performance reasons.</li> </dd> <dt>algorithm (optional)</dt> -<dd>TEXT, default: 'brute_force'. The name of the algorithm -used to compute clusters. Currently only brute force is supported: +<dd>TEXT, default: 'optimized'. The name of the algorithm +used to compute clusters. <ul> -<li><b>\ref brute_force</b>: Produces an exact result by searching -all points in the search space. Brute force can be slow and is intended -to be used for small datasets only. You can also use a short -form "b" or "brute" etc. to select brute force.</li> +<li><b>\ref brute_force</b>: Brute force can be slow and should +not be used for large datasets. However, it does have less initial +setup overhead than the parllel-optimized algorithm, which may make +it faster for some cases involving small datasets. You can also use +a short form "b" or "brute" etc. to select brute force.</li> + +<li><b>\ref optimized</b>: This uses an rtree index to accelerate range_queries +while performing the cluster detection. It is designed with gpdb in mind, +in which case it intelligently partitions the space into a number of connected +regions, runs DBSCAN in parallel on each region, and then merges the results +together. By default, the maximum number of regions it will consider using +is equal to the number of segments in the database, but if you suspect it +may be spending too much time on the segmentation, this can be limited further +by setting the max_segmentation_depth parameter to something lower. This should +decrease the segmentation overhead, but will also decrease the amount of parallelism. +If the dataset is relatively small, another option to consider is brute force, +which has very little overhead but won't scale as well. For large enough datasets, +and an appropriate choice of eps, this algorithm should be able to run in O((N/S) log N) +time where N is the number of rows (points) in the input and S is the number of regions +(often equal to the number of segments, but may be less). (Worst case for brute force +is O(N^2).) eps should be chosen based on the density of the dataset, where DBSCAN works +best for datasets where the clusters have a roughly uniform density. If eps is chosen +too large, then the runtime will explode and nearly every point will be considered +connected to every other point in one large cluster. Best practice is to start with +a relatively small eps, so it can return the first result faster; if it looks +like there are too many small clusters, increase eps and allow it to run for longer. + </ul></dd> </dl> @@ -146,23 +169,25 @@ form "b" or "brute" etc. to select brute force.</li> The output table for DBSCAN module has the following columns: <table class="output"> <tr> - <th>id_column</th> - <td>INTEGER. Test data point id.</td> + <th>id</th> + <td>SMALLINT|INTEGER|BIGINT. A column or expression which specifies + unique ids for each row in the training dataset. Must evaluate + to an integer type.</td> </tr> <tr> <th>cluster_id</th> - <td>INTEGER. Cluster id associated with the test data point.</td> + <td>INTEGER. The cluster id of each classified data point.</td> </tr> <tr> <th>is_core_point</th> - <td>BOOLEAN. Indicates if the test data point is core + <td>BOOLEAN. Indicates if the training data point is core If it is not a core point and listed in the output table, it is a border point. Noise points are not shown in this table.</td> </tr> <tr> - <th>__points__</th> - <td>TEXT. Column or expression for the data point.</td> + <th>point</th> + <td>TEXT. The coordinates of each point classified</td> </tr> </table> @@ -174,8 +199,8 @@ After clustering, the cluster assignment for each data point can be computed: <pre class="syntax"> dbscan_predict( dbscan_table, source_table, - id_column, - expr_point, + id, + point, output_table ) </pre> @@ -190,11 +215,12 @@ dbscan_predict( dbscan_table, <dd>TEXT. Name of the table containing the input data points. </dd> -<dt>id_column</dt> -<dd>TEXT. Name of the column containing a unique integer id for each training point. +<dt>id</dt> +<dd>VARCHAR. A column name or expression which selects a unique integer + id for each training point. </dd> -<dt>expr_point</dt> +<dt>point</dt> <dd>TEXT. Name of the column with point coordinates in array form, or an expression that evaluates to an array of point coordinates. </dd> @@ -209,8 +235,8 @@ or an expression that evaluates to an array of point coordinates. The output is a table with the following columns: <table class="output"> <tr> - <th>id_column</th> - <td>INTEGER. ID column passed to the function.</td> + <th>id</th> + <td>BIGINT. The unique id of each row, as it as passed in.</td> </tr> <tr> <th>cluster_id</th> @@ -222,7 +248,6 @@ The output is a table with the following columns: </tr> </table> - @anchor examples @par Examples @@ -322,7 +347,7 @@ The summary table lists the 'eps' value and the distance metric used: SELECT * FROM dbscan_result_summary; </pre> <pre class="result"> - id_column | eps | metric + id | eps | metric -----------+------+------------ pid | 1.75 | dist_norm2 </pre> @@ -378,7 +403,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan( min_samples INTEGER, metric VARCHAR, algorithm VARCHAR, - depth INTEGER + max_segmentation_depth INTEGER ) RETURNS VOID AS $$ PythonFunction(dbscan, dbscan, dbscan) $$ LANGUAGE plpythonu VOLATILE @@ -415,7 +440,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan( source_table VARCHAR, output_table VARCHAR, id_column VARCHAR, - expr_point VARCHAR, + point VARCHAR, eps DOUBLE PRECISION, min_samples INTEGER ) RETURNS VOID AS $$ @@ -473,30 +498,30 @@ $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge( - state1 INTEGER[][], - state2 INTEGER[][] -) RETURNS INTEGER[][] AS $$ + state1 BIGINT[][], + state2 BIGINT[][] +) RETURNS BIGINT[][] AS $$ PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') return dbscan.sf_merge(**globals()) $$ LANGUAGE plpythonu m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final( - state INTEGER[][] -) RETURNS INTEGER[][] AS $$ + state BIGINT[][] +) RETURNS BIGINT[][] AS $$ PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') return dbscan.sf_final(**globals()) $$ LANGUAGE plpythonu m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition( - state INTEGER[], + state BIGINT[], src BIGINT, dest BIGINT, n_rows INTEGER[], gp_segment_id INTEGER -) RETURNS INTEGER[][] AS $$ +) RETURNS BIGINT[][] AS $$ PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') return dbscan.sf_transition(**globals()) $$ LANGUAGE plpythonu @@ -513,77 +538,245 @@ CREATE AGGREGATE MADLIB_SCHEMA.build_snowflake_table( INTEGER[], INTEGER )( - STYPE=INTEGER[][], + STYPE=BIGINT[][], SFUNC=MADLIB_SCHEMA.sf_transition, m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,') FINALFUNC=MADLIB_SCHEMA.sf_final ); -CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.find_core_points_transition( - state INTEGER[], - id_in BIGINT, - expr_points DOUBLE PRECISION[], - eps DOUBLE PRECISION, - min_samples INTEGER, - metric VARCHAR, - n_rows INTEGER[], - leaf_id INTEGER - -) RETURNS INTEGER[][] AS $$ -PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') - return dbscan.find_core_points_transition(**globals()) -$$ LANGUAGE plpythonu -m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); - -CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.find_core_points_merge( - state1 INTEGER[][], - state2 INTEGER[][] -) RETURNS INTEGER[][] AS $$ -PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') - return dbscan.find_core_points_merge(**globals()) -$$ LANGUAGE plpythonu -m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); - -CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.find_core_points_final( - state INTEGER[][] -) RETURNS INTEGER[] AS $$ -PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan') - return dbscan.find_core_points_final(**globals()) -$$ LANGUAGE plpythonu -m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); - ---id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, -DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.find_core_points( - BIGINT, - DOUBLE PRECISION[], - DOUBLE PRECISION, - INTEGER, - VARCHAR, - INTEGER[], - INTEGER); -CREATE AGGREGATE MADLIB_SCHEMA.find_core_points( - BIGINT, - DOUBLE PRECISION[], - DOUBLE PRECISION, - INTEGER, - VARCHAR, - INTEGER[], - INTEGER -)( - STYPE=INTEGER[][], - SFUNC=MADLIB_SCHEMA.find_core_points_transition, - m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.find_core_points_merge,') - FINALFUNC=MADLIB_SCHEMA.find_core_points_final +DROP TYPE IF EXISTS MADLIB_SCHEMA.__dbscan_record CASCADE; +CREATE TYPE MADLIB_SCHEMA.__dbscan_record AS ( + id BIGINT, + point DOUBLE PRECISION[], + is_core_point BOOL, + cluster_id BIGINT, + leaf_id INT, + dist_id INT ); -DROP TYPE IF EXISTS MADLIB_SCHEMA.unpacked_2d CASCADE; -CREATE TYPE MADLIB_SCHEMA.unpacked_2d AS ( +DROP TYPE IF EXISTS MADLIB_SCHEMA.__dbscan_edge CASCADE; +CREATE TYPE MADLIB_SCHEMA.__dbscan_edge AS ( src BIGINT, dest BIGINT); -CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.unpack_2d (packed INTEGER[][]) - RETURNS SETOF MADLIB_SCHEMA.unpacked_2d +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_unpack_edge (packed BIGINT[][]) + RETURNS SETOF MADLIB_SCHEMA.__dbscan_edge AS $$ return packed $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_safe_ln(x DOUBLE PRECISION) +RETURNS DOUBLE PRECISION AS +$$ + SELECT CASE WHEN $1 > 0.0 + THEN ln($1) + ELSE -1000.0 + END +$$ LANGUAGE SQL IMMUTABLE; + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_safe_exp(x DOUBLE PRECISION) +RETURNS DOUBLE PRECISION AS +$$ + SELECT CASE WHEN $1 > -600.0 + THEN exp($1) + ELSE 0.0 + END +$$ LANGUAGE SQL IMMUTABLE; + +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.prod(DOUBLE PRECISION); +CREATE AGGREGATE MADLIB_SCHEMA.prod(DOUBLE PRECISION) +( + sfunc = float8mul, + stype = DOUBLE PRECISION +); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_split_volume( + V DOUBLE PRECISION, -- $1 + eps_bin DOUBLE PRECISION, -- $2 + eps_bins DOUBLE PRECISION -- $3 +) RETURNS DOUBLE PRECISION +AS +$$ SELECT + $2 / $3 * $1 +$$ LANGUAGE SQL IMMUTABLE; + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_estimate_neighbors( + internal_points DOUBLE PRECISION, -- $1 + V DOUBLE PRECISION, -- $2 + eps DOUBLE PRECISION, -- $3 + D BIGINT -- $4 +) RETURNS DOUBLE PRECISION +AS +$$ SELECT + CASE WHEN $2 > 0.0 + THEN + $1 * ($3^$4 / $2) + ELSE + 1.0 + END +$$ LANGUAGE SQL IMMUTABLE; + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_range_query_penalty( + internal_points BIGINT, -- $1 + neighbors DOUBLE PRECISION -- $2 +) RETURNS DOUBLE PRECISION +AS +$$ SELECT + greatest( + $2, + CASE WHEN $1 > 2 + -- 60x faster than log(internal_points, 2.0) + -- due to DOUBLE PRECISION types instead of NUMERIC + THEN ceil(ln($1::DOUBLE PRECISION) / ln(2.0::DOUBLE PRECISION)) + ELSE 1.0 + END + ) +$$ LANGUAGE SQL IMMUTABLE; + +DROP TYPE IF EXISTS MADLIB_SCHEMA.__dbscan_losses; +CREATE TYPE MADLIB_SCHEMA.__dbscan_losses AS ( + left_avail BIGINT, + right_avail BIGINT, + left_loss DOUBLE PRECISION, + right_loss DOUBLE PRECISION +); + +-- Decide how much of node's available segments to allocate +-- to its left child. The rest will be allocated to its +-- right child +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_allocate_segs( + avail_segs BIGINT, -- $1 + left_points DOUBLE PRECISION, -- $2 + right_points DOUBLE PRECISION, -- $3 + rqp_left DOUBLE PRECISION, -- $4 + rqp_right DOUBLE PRECISION -- $5 +) RETURNS BIGINT AS +$$ + WITH p(left_penalty, right_penalty) AS ( + SELECT + $4 * $2, + $5 * $3 + ) + SELECT + round( + left_penalty * $1 / (left_penalty + right_penalty) + )::BIGINT + FROM p +$$ LANGUAGE SQL IMMUTABLE; + +-- Given internal and external point counts in left and right regions, +-- returns optimal number of segments to allocate to left and right +-- nodes, as well as their losses. +-- +-- TODO: we can probably remove range-query neighbor penalty for +-- external points if we use rtree.count() instead of +-- rtree.intersection(). Just need to finish adding the +-- per-cluster rtrees +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_segmentation_loss( + left_internal BIGINT, -- $1 + right_internal BIGINT, -- $2 + left_external BIGINT, -- $3 + right_external BIGINT, -- $4 + avail_segs BIGINT, -- $5 + V DOUBLE PRECISION, -- $6 + eps DOUBLE PRECISION, -- $7 + D BIGINT, -- $8 + eps_bin DOUBLE PRECISION, -- $9 + eps_bins DOUBLE PRECISION -- $10 + ) RETURNS MADLIB_SCHEMA.__dbscan_losses AS +$$ + WITH __local AS ( + SELECT + ($1 + $3) left_points, + ($2 + $4) right_points, + $9 left_eps_bins, + $10 - $9 right_eps_bins, + $10 all_eps_bins + ), penalties AS ( + SELECT + MADLIB_SCHEMA.__dbscan_range_query_penalty( + $1 + $2, + MADLIB_SCHEMA.__dbscan_estimate_neighbors( + $1 + $2, + $6, $7, $8 + ) + ) rqp_all, + MADLIB_SCHEMA.__dbscan_range_query_penalty( + $1, + MADLIB_SCHEMA.__dbscan_estimate_neighbors( + $1, + MADLIB_SCHEMA.__dbscan_split_volume( + $6, + left_eps_bins, + all_eps_bins + ), + $7, + $8 + ) + ) rqp_left, + MADLIB_SCHEMA.__dbscan_range_query_penalty( + $2, + MADLIB_SCHEMA.__dbscan_estimate_neighbors( + $2, + MADLIB_SCHEMA.__dbscan_split_volume( + $6, + left_eps_bins, + all_eps_bins + ), $7, $8 + ) + ) rqp_right + FROM __local + ), seg_alloc(left_avail, all_penalty) AS ( + SELECT + MADLIB_SCHEMA.__dbscan_allocate_segs( + $5, + left_points, + right_points, + rqp_left, + rqp_right + ), + (left_points + right_points) * rqp_all + FROM __local, penalties + ) + SELECT + CASE WHEN + 0 < left_avail AND left_avail < $5 + THEN + ROW( + left_avail, + $5 - left_avail, + (left_points * 1.0 / left_avail) * rqp_left, + (right_points * 1.0 / ($5 - left_avail)) * rqp_right + )::MADLIB_SCHEMA.__dbscan_losses + ELSE -- forget about splitting, not worth it + ROW( + $5, + 0, + -- We intentionally do not divide by avail_segs, + -- because if we choose not to split, then there can be no + -- more division later on + (left_points + right_points) * rqp_all, + 0.0 + )::MADLIB_SCHEMA.__dbscan_losses + END + FROM __local, penalties, seg_alloc +$$ LANGUAGE SQL IMMUTABLE; + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_leaf( + db_rec MADLIB_SCHEMA.__dbscan_record, + eps REAL, + min_samples INT, + metric TEXT, + num_internal_points BIGINT, + num_external_points BIGINT +) RETURNS SETOF MADLIB_SCHEMA.__dbscan_record AS +$$ + global SD + args.append(SD) + import plpy + + PythonFunctionBodyOnlyNoSchema(dbscan,dbscan) + WithTracebackForwarding(return dbscan.dbscan_leaf(*args)) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); diff --git a/src/ports/postgres/modules/dbscan/test/dbscan.sql_in b/src/ports/postgres/modules/dbscan/test/dbscan.sql_in index 0e79e98..5756b99 100644 --- a/src/ports/postgres/modules/dbscan/test/dbscan.sql_in +++ b/src/ports/postgres/modules/dbscan/test/dbscan.sql_in @@ -43,18 +43,20 @@ copy dbscan_train_data (id_in, data) FROM stdin delimiter '|'; DROP TABLE IF EXISTS out1, out1_summary, out1_predict; SELECT dbscan('dbscan_train_data','out1','id_in','data',20,4,'squared_dist_norm2','brute'); -SELECT assert(count(DISTINCT id_in) = 5, 'Incorrect cluster 0') FROM out1 WHERE cluster_id = 0 and id_in=ANY(ARRAY[1,2,3,4,5]); +SELECT assert(count(DISTINCT id) = 5, 'Incorrect cluster 0') FROM out1 WHERE cluster_id = 0 and id=ANY(ARRAY[1,2,3,4,5]); -SELECT assert(count(DISTINCT id_in) = 4, 'Incorrect cluster 1') FROM out1 WHERE cluster_id = 1 and id_in=ANY(ARRAY[6,7,8,9]); +SELECT assert(count(DISTINCT id) = 4, 'Incorrect cluster 1') FROM out1 WHERE cluster_id = 1 and id=ANY(ARRAY[6,7,8,9]); + +SELECT assert(id_column = 'id_in', 'id_column field in summary table should have been ''id_in''') FROM out1_summary; SELECT dbscan_predict('out1', 'dbscan_train_data', 'id_in', 'data', 'out1_predict'); SELECT assert(count(DISTINCT cluster_id) = 2, 'Incorrect cluster count') FROM out1_predict; -DROP TABLE IF EXISTS out1, out1_summary, out1_predict; +DROP TABLE IF EXISTS out1, out1_summary; SELECT dbscan('dbscan_train_data','out1','id_in','data',20,4,'dist_norm2','brute'); -DROP TABLE IF EXISTS out1, out1_summary, out1_predict; +DROP TABLE IF EXISTS out1, out1_summary; SELECT dbscan('dbscan_train_data','out1','id_in','data',20,4,'dist_norm1','brute'); DROP TABLE IF EXISTS dbscan_train_data2; @@ -97,16 +99,16 @@ INSERT INTO dbscan_test_data2 VALUES DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary, dbscan_predict_out; SELECT dbscan( -'dbscan_train_data2', -- source table +'dbscan_train_data2', -- source table 'dbscan_result', -- output table 'pid', -- point id column -'points', -- data point +'points', -- data point 1.75, -- epsilon 4, -- min samples 'dist_norm2', -- metric -'brute_force'); -- algorithm +'brute_force'); -- algorithm -SELECT * FROM dbscan_result ORDER BY pid; +SELECT * FROM dbscan_result ORDER BY id; SELECT assert(count(DISTINCT cluster_id) = 3, 'Incorrect cluster count') FROM dbscan_result; @@ -114,42 +116,82 @@ SELECT dbscan_predict('dbscan_result', 'dbscan_test_data2', 'pid', 'points', 'db SELECT assert(count(DISTINCT cluster_id) = 3, 'Incorrect cluster count') FROM dbscan_predict_out; -DROP TABLE IF EXISTS dbscan_train_data3; -CREATE TABLE dbscan_train_data3 ( -id_in integer, -data0 integer, -data1 integer); -copy dbscan_train_data3 (id_in, data0, data1) FROM stdin delimiter '|'; -1|1|1 -2|2|2 -3|3|3 -4|4|4 -5|4|8 -6|17|8 -7|19|8 -8|19|9 -9|19|10 -10|3|111 -11|3|112 -12|3|113 -13|8|113 +DROP TABLE IF EXISTS dbscan_cities; +CREATE TABLE dbscan_cities ( + city TEXT, + country TEXT, + lat DOUBLE PRECISION, + lon DOUBLE PRECISION, + PRIMARY KEY (city, country) +); +COPY dbscan_cities (city, country, lat, lon) FROM stdin csv header; +city,country,lat,lon +Bangkok,Thailand,13.7563,100.5018 +Athens,Greece,37.9838,23.7275 +Beijing,China,39.9042,116.4074 +St. Petersburg,Russia,59.9311,30.3609 +Barcelona,Spain,41.3874,2.1686 +Tehran,Iran,35.6892,51.3890 +Bristol,England,51.4545,-2.5879 +São Paulo,Brazil,-23.5558,-46.6396 +Berlin,Germany,52.5200,13.4050 +New Dehli,India,28.6139,77.2090 +Sydney,Australia,-34,151 +Bogotá,Colombia,4,-74 +Kingston,Jamaica,17,-76 +Rome,Italy,41,12 +Bombay,India,19,72 +Cape Town,South Africa,-33,22 +Cairo,Egypt,30,31 +Guayaquil,Ecuador,-2,-79 +Jakarta,Indonesia,-6,106 +Hamburg,Germany,53.5511,9.9937 +Hong Kong,China,22,114 +Bordeux,France,44,0 +Nairobi,Kenya,-1.2921,36.8219 +Osaka,Japan,34,135 \. -DROP TABLE IF EXISTS out1, out1_summary; -SELECT dbscan('dbscan_train_data3','out1','id_in','ARRAY[data0,data1]',20,4,'squared_dist_norm2','brute'); +\set unit_3d_point_expr ARRAY[COS(radians(lat))*COS(radians(lon)), COS(radians(lat))*SIN(radians(lon)), SIN(radians(lat))] +\set id_expr '(''x'' || substr(md5(city || '','' || country),1,16))::BIT(64)::BIGINT' -DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary, dbscan_predict_out; +-- Test that id and point params may be passed in as expressions +DROP TABLE IF EXISTS dbscan_cities_out, dbscan_cities_out_summary; +SELECT dbscan('dbscan_cities', + 'dbscan_cities_out', + :'id_expr', + :'unit_3d_point_expr', + radians(22), -- eps = 35 degrees + 3, -- min_samples = 3 + 'dist_angle', + 'brute' +); + +SELECT assert( + id_column = $$('x' || substr(md5(city || ',' || country),1,16))::BIT(64)::BIGINT$$, + 'id expression passed in does not match expression saved in summary table: ' || id_column +) FROM dbscan_cities_out_summary; +SELECT assert( + expr_point = :'unit_3d_point_expr', + 'point expression passed in does not match expression saved in summary table: ' || expr_point +) FROM dbscan_cities_out_summary; -- verify point expression saved in summary +SELECT assert(COUNT(*) = 18, 'Wrong number of rows returned. Expeted: 7, Actual: ' || COUNT(*)) FROM dbscan_cities_out; +SELECT assert(COUNT(DISTINCT cluster_id) = 3, 'Wrong number of clusters found. Expected: ' || 3 || ', Actual: ' || COUNT(DISTINCT cluster_id)) FROM dbscan_cities_out; +SELECT assert(COUNT(*) = 15, 'Wrong number of core points found. Expected: ' || 15 || ', Actual: ' || COUNT(*)) FROM dbscan_cities_out WHERE is_core_point; +SELECT assert(ARRAY(SELECT city FROM dbscan_cities JOIN (SELECT id FROM dbscan_cities_out WHERE cluster_id IN (SELECT cluster_id FROM dbscan_cities_out b GROUP BY cluster_id HAVING (COUNT(*)) < 5)) x ON id = :id_expr ORDER BY id) = ARRAY['Bogotá','Kingston','Guayaquil'], 'Wrong cities in smallest cluster, expected: Bogotá, Kingston, Guayaquil'); + +DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary; SELECT dbscan( -'dbscan_train_data2', -- source table +'dbscan_train_data2', -- source table 'dbscan_result', -- output table 'pid', -- point id column -'points', -- data point +'points', -- data point 1.75, -- epsilon 4, -- min samples 'dist_norm2', -- metric -'brute_force'); -- algorithm +'brute_force'); -- algorithm --- Test kd-tree +-- Test optimized dbscan DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary; SELECT dbscan( @@ -160,17 +202,16 @@ SELECT dbscan( 1.75, -- epsilon 4, -- min samples 'dist_norm2', -- metric -'kd_tree', -- algorithm +'optimized', -- algorithm 2); -- depth -SELECT * FROM dbscan_result ORDER BY pid; +SELECT * FROM dbscan_result ORDER BY id; - -CREATE TABLE dbscan_kd_test_data ( +CREATE TABLE dbscan_opt_test_data ( row_id integer, row_vec double precision[] ); -COPY dbscan_kd_test_data (row_id, row_vec) FROM stdin DELIMITER '|'; +COPY dbscan_opt_test_data (row_id, row_vec) FROM stdin DELIMITER '|'; 1 | {-123.932588658681,312.674262022751,24.5140517556683} 2 | {-1011.71837586596,294.857038270306,-596.339561040807} 3 | {51.8496080463916,-392.173152861615,188.5097494329} @@ -1174,23 +1215,24 @@ COPY dbscan_kd_test_data (row_id, row_vec) FROM stdin DELIMITER '|'; \. DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary; -SELECT dbscan('dbscan_kd_test_data','dbscan_result','row_id','row_vec',50, 3, 'dist_norm2', 'br'); +SELECT dbscan('dbscan_opt_test_data','dbscan_result','row_id','row_vec',50, 3, 'dist_norm2', 'br'); -DROP TABLE IF EXISTS dbscan_result_kd, dbscan_result_kd_summary; -SELECT dbscan('dbscan_kd_test_data','dbscan_result_kd','row_id','row_vec',50, 3, 'dist_norm2', 'kd', 2); +DROP TABLE IF EXISTS dbscan_result_opt, dbscan_result_opt_summary; +SELECT dbscan('dbscan_opt_test_data','dbscan_result_opt','row_id','row_vec',50, 3, 'dist_norm2', 'optimized', 2); -SELECT assert(count(row_id) = 50 AND count(distinct cluster_id) = 11, 'Incorrect cluster counts for brute') +SELECT assert(count(id) = 50 AND count(distinct cluster_id) = 11, 'Incorrect cluster counts for brute') FROM dbscan_result; -SELECT assert(count(row_id) = 50 AND count(distinct cluster_id) = 11, 'Incorrect cluster counts for kd') -FROM dbscan_result_kd; +SELECT assert(count(id) = 50 AND count(distinct cluster_id) = 11, 'Incorrect cluster counts for optimized dbscan') +FROM dbscan_result_opt; CREATE TABLE temp_result AS -SELECT row_id +SELECT id FROM dbscan_result -WHERE cluster_id = (SELECT cluster_id FROM dbscan_result WHERE row_id = 4); +WHERE cluster_id = (SELECT cluster_id FROM dbscan_result WHERE id = 4); SELECT assert(count(*) = 6, 'Clusters do not match') -FROM dbscan_result_kd -WHERE cluster_id = (SELECT cluster_id FROM dbscan_result_kd WHERE row_id = 4) - AND row_id IN (SELECT row_id FROM temp_result); +FROM dbscan_result_opt +WHERE cluster_id = (SELECT cluster_id FROM dbscan_result_opt WHERE id = 4) + AND id IN (SELECT id FROM temp_result); + diff --git a/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in b/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in index 1aa0681..3c71e73 100644 --- a/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in +++ b/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in @@ -19,18 +19,18 @@ import sys import numpy as np +from collections import Counter from os import path - -# Add modules to the pythonpath. -sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) -sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))) - import unittest from mock import * import plpy_mock as plpy m4_changequote(`<!', `!>') +# Add modules to the pythonpath. +sys.path.append(path.join(path.dirname(path.abspath(__file__)), '../../..')) +sys.path.append(path.join(path.dirname(path.abspath(__file__)), '../..')) + class DBSCANTestCase(unittest.TestCase): def setUp(self): @@ -52,27 +52,343 @@ class DBSCANTestCase(unittest.TestCase): 'output_table' : "output", 'id_column' : "id_in", 'expr_point' : "data", - 'eps' : 20, - 'min_samples' : 4, - 'metric' : 'squared_dist_norm2', + 'eps' : 3.0, + 'min_samples' : 1, + 'metric' : 'dist_norm2', 'algorithm' : 'brute', - 'depth': 2 + 'max_segmentation_depth': 5 } import dbscan.dbscan self.module = dbscan.dbscan + self.module.pstats = self.module.dbscan_perfstats() + + dbscan_record = self.module.dbscan_record + dbscan_internal_record = self.module.dbscan_internal_record + dbscan_external_record = self.module.dbscan_external_record + + test_ids = [ 5555, 0, 66, 21, 43, 11 ] + points = [ [ 5.2, 8.9, -6.1, 0.0 ], [ 0.0, -1.9, 5.6, 1.4 ], + [ 0.0, 0.0, 0.0, 0.0 ], [-1.8, 0.0, 2.0, -1.3], + [ 0.1, -0.1, 0.0, 0.5 ], [-1.0, 2.0, 2.1, -1.4] ] + # For reference, here is the table of distances between these test points: + # + # In [0]: for i in range(len(points)): + # ...: print(np.linalg.norm(points - points [i], ord=2, axis=1)) + # ...: + # [ 0. 16.80862874 11.97747887 13.98248905 12.01956738 12.45993579] + # [16.80862874 0. 6.07700584 5.20576603 5.95147041 6.02494813] + # [11.97747887 6.07700584 0. 2.98831056 0.51961524 3.37194306] + # [13.98248905 5.20576603 2.98831056 0. 3.29545141 2.15870331] + # [12.01956738 5.95147041 0.51961524 3.29545141 0. 3.69323706] + # [12.45993579 6.02494813 3.37194306 2.15870331 3.69323706 0. ] + # + # By choosing eps=3.0, that means the first 2 points have 0 neighbors: + # id=5555 and id=0 + # The last two points each have 1 neighbor: + # id=66 is the sole neighbor of id=43 + # id=21 is the sole neighbor of id=11 + # And the middle two each have 2 neighbors: + # id=66 and id=21 each neighbor each other, plus the neighbor listed + # above + # If min_samples <= 3, then 66, 21, 43, and 11 form a cluster of 4 points, + # where all 4 should be labelled as core for min_samples = 2, but only + # points 66 and 21 are core for min_samples=3. + # For min_samples = 1, all 6 points are core, so there are 3 clusters + # adding the 2 isolated single-point clusters to the 4-point cluster + # For min_samples > 3 there are no core points, so all points are noise + # + + # Now we assign each point to one of 3 leaves: + leaf_ids = [ 1, 2, 6, 6, 1, 2 ] + + # And also add it as an external point to a different leaf: + dist_ids = [ 6, 1, 1, 2, 2, 1 ] # only used for external recs + + # row_counts metadata for each dist_id: + # dist_id=1: 2 internal points, 3 external ( from leaf_ids 2,6,2 ) + # dist id=2: 2 internal points, 2 external ( from leaf_ids 1,6 ) + # dist id=6: 2 internal points, 1 external ( from leaf_id 1 ) + self.internal_rowcounts = Counter(leaf_ids) + self.external_rowcounts = Counter(dist_ids) + + self.eps = 3.0 + self.metric = 'dist_norm2' + + db_recs = [] + db_dicts = [] + for i in range(len(test_ids)): + internal_rec = { 'id' : test_ids[i], 'point' : points[i], 'leaf_id' : leaf_ids[i], 'dist_id' : leaf_ids[i] } + db_dicts.append(internal_rec) + db_recs.append(dbscan_record.from_dict(internal_rec, self.eps)) + external_rec = { 'id' : test_ids[i], 'point' : points[i], 'leaf_id' : leaf_ids[i] , 'dist_id' : dist_ids[i] } + db_dicts.append(external_rec) + db_recs.append(dbscan_record.from_dict(external_rec, self.eps)) + + self.test_ids = test_ids + self.points = points + self.leaf_ids = leaf_ids + self.dist_ids = dist_ids + self.db_dicts = db_dicts + self.db_recs = db_recs def tearDown(self): self.module_patcher.stop() - def test_validate_dbscan(self): + def test_dbscan_record(self): + dbscan_record = self.module.dbscan_record + dbscan_internal_record = self.module.dbscan_internal_record + dbscan_external_record = self.module.dbscan_external_record + label = self.module.label + db_recs = self.db_recs + db_dicts = self.db_dicts + test_ids = self.test_ids + points = self.points + leaf_ids = self.leaf_ids + dist_ids = self.dist_ids + eps = self.eps + n = len(test_ids) + + # Test creating dbscan_records from dict input + j = 0 + for i in range(n): + for t in ['internal', 'external']: + db_rec = db_recs[j] + self.assertEqual( db_rec.id, test_ids[i] ) + self.assertEqual( db_rec.point, points[i] ) + self.assertEqual( db_rec.leaf_id, leaf_ids[i] ) + self.assertEqual( db_rec.cluster_id, label.UNLABELLED ) + self.assertFalse( db_rec.is_core_point ) + if isinstance(db_rec, dbscan_internal_record): + self.assertEqual( db_rec.dist_id, leaf_ids[i] ) + elif isinstance(db_rec, dbscan_external_record): + self.assertEqual( db_rec.dist_id, dist_ids[i] ) + else: + assertEqual(0, 1 , "dbscan_record.from_dict() returned instance of unrecognized class {}".format(type(db_rec) ) ) + j += 1 + + db_rec = dbscan_record.from_dict(db_recs[0].__dict__, eps) # 0 neighbors + 1 self + min_samples = 1 # For min_samples = 1, every point is a core point + self.assertFalse(db_rec.not_core_point(min_samples), "This should be a core point") + min_samples = 2 # For min_samples = 2, need at least 1 neighbor to be core + self.assertTrue(db_rec.not_core_point(min_samples), "This should NOT be a core point") + + min_samples = 6 + db_rec._internal_neighbor_count = 4 + db_rec._external_neighbors = [] # 4 + 0 neighbors + 1 self < min_samples=6 + self.assertTrue(db_rec.not_core_point(min_samples), "This should be NOT be a core point") + db_rec._internal_neighbor_count = 2 + db_rec._external_neighbors = [5, 8, 9] # 2 + 3 = 5 neighbors + 1 for self >= min_samples=6 + self.assertFalse(db_rec.not_core_point(min_samples), "This should be a core point") + + # Test cloning of an internal record + # (extra fields added should all have been deleted) + db_rec = dbscan_record(db_rec) + actual = db_dicts[0] + actual['cluster_id'] = 0 + actual['is_core_point'] = True # verify cloned as core point + self.assertEqual(db_rec.__dict__, actual) + + # Test cloning of an external record + db_rec = dbscan_record(db_recs[1]) + actual = db_dicts[1] + actual['cluster_id'] = 0 + actual['is_core_point'] = False # verify cloned as non-core point + self.assertEqual(db_rec.__dict__, actual) + + def test_leaf_storage(self): + # Add some internal and external points, mark some as part of + # clusters, test range queries and assert that: + # 1. the right points are returned + # 2. neighbor counts and possible_border_points are updated properly + # 3. search point has been removed from rtree + # + # For the purposes of testing LeafStorage, we ignore leaf_id & + # dist_id, adding all 8 records to the same leaf. + + eps = self.eps + min_samples = 3 + metric = self.metric + db_recs = self.db_recs + + db_rec = db_recs[0] + LeafStorage = self.module.LeafStorage + + leaf = LeafStorage( 4, + np.float64, + eps, + metric, + min_samples + ) + + for db_rec in db_recs: + leaf.add_point(db_rec) + + neighbor_counts = [ 1, 1, 3, 3, 2, 2 ] # Number of neighbors (including self) of each of the + # 6 points for eps=3.0 + + returned_neighbors = [ [], [], [43, 21], [11], [], [] ] + + core_points = { 21, 66, 43, 11 } # Points expected to be marked core, with min_samples = 3 + # Each point should be counted twice (1 internal + 1 external), + # so min_samples = 4 in this case means it has at least 1 actual + # neighbor plus itself + + # Test calling range_query() on external points + # External points must be queried first, so that all of the + # _ext_neighbors lists get updated before we test the internal + # points + for i in range(len(neighbor_counts)): + db_rec = db_recs[2*i+1] + neighbors = leaf.range_query(db_rec) + self.assertEqual( len(neighbors), neighbor_counts[i]) + + # Test calling range_query() on internal points + for i in range(len(neighbor_counts)): + db_rec = db_recs[2*i] + neighbors = leaf.range_query(db_rec) + self.assertEqual( len(db_rec._external_neighbors), neighbor_counts[i] ) + self.assertEqual( db_rec._internal_neighbor_count, neighbor_counts[i] - 1 ) + self.assertItemsEqual( neighbors, returned_neighbors[i], + "Expected range_query({}) to return neighbors {}, but returned {}".format( + db_rec.id, returned_neighbors[i], neighbors + ) + ) + self.assertEqual( db_rec.is_core_point, db_rec.id in core_points, + "Unexpected is_core_point={} for internal point id={} after range_query()".format( + db_rec.is_core_point, db_rec.id + ) + ) + + # Helper function for test_dbscan_leaf() + # Builds a dictionary of expected dbscan output records + # + # Inputs: + # ret_ids: list of expected ids to be returned + # cluster_ids: list of expected cluster labels, in same order + # external_flags: list of booleans indicating which records are external + # core_points: set of ids expected to be labeled as core points + # + # Output: A nested dictionary of output results indexed at first + # level by point ids, and second level by 'internal' vs 'external' + # + # There should be at most 1 internal row returned for each internal id, + # so expected_res[id]['internal'] is a single dbscan_record + # + # Since dbscan_record should return multiple rows for some external points, + # expected_res[id]['external'] is a list (stack) of the expected external + # dbscan_records for each id, in reverse order of how they are expected + # to be returned. The 0th element of the list for each id will be None, + # so that if the number of rows returned for any id exceeds the expected + # number, the first unexpected row will be printed properly assertEquals, + # instead of causing an Exception while trying to access an empty list + # (after all other elements have been popped from the stack, as they are + # compared). + # + def build_expected_dbscan_leaf_output(self, ret_ids, cluster_ids, + external_flags, core_points): + test_ids = self.test_ids + db_recs = self.db_recs + dbscan_record = self.module.dbscan_record + expected_res = {} + id_lookup = { test_ids[i] : i for i in range(len(test_ids)) } + + for i, id in enumerate(ret_ids): + j = id_lookup[id] + if not id in expected_res: + entry = dict() + entry['external'] = [ None ] + entry['internal'] = None + expected_res[id] = entry + + if external_flags[i]: + rec = dbscan_record(db_recs[2*j+1]) # start with original external rec + # Modify record with expected cluster_id + rec.cluster_id = cluster_ids[i] + rec.is_core_point = False # external records should never be marked as core + entry['external'].append(rec) + else: + rec = dbscan_record(db_recs[2*j]) # start with original internal rec + # Modify record with expected cluster_id & is_core_point flag + rec.cluster_id = cluster_ids[i] + rec.is_core_point = id in core_points + entry['internal'] = rec + + return expected_res + + def test_dbscan_leaf(self): + # Test dbscan_leaf() by calling it with some simple test + # data. Ensure that: + # 1. 1 record is returned for each non-noise point (especially, non-core points) + # and a record for each cluster each external point is near. + # eg. if 1 external point is near 2 clusters, and another is + # near 1 cluster, then 3 records should be returned (+ internal records) + # 2. SD is properly removed if an exception is hit, or if last row + # 3. External neighbors are included in deciding whether each internal point + # is a core_point. ie. If an external neighbor brings the total # of + # neighbors (including self) for an internal point up to min_samples, it + # should be marked a core point. + # 4. cluster_id's are assigned correctly, and leaf_id's & dist_id's are unchanged + + self.maxDiff = 2000 + dbscan_record = self.module.dbscan_record + eps = self.eps + metric = self.metric + min_samples = 1 + + db_recs = self.db_recs + SD = dict() + + # Call dbscan_leaf() with eps=3.0 + res = [] + for db_rec in db_recs: + num_internal_points = self.internal_rowcounts[db_rec.dist_id] + num_external_points = self.external_rowcounts[db_rec.dist_id] + state = self.module._dbscan_leaf(db_rec.__dict__, eps, min_samples, metric, num_internal_points, + num_external_points, SD) + res += list(state) + + self.assertEqual(SD, {}, "SD was not properly cleaned up after calls to dbscan_leaf() complete") + + # Expected results for eps=3.0, min_samples=1 + ret_ids = [ 5555, 0, 66, 66, 21, 21, 43, 11 ] + cluster_ids = [ 2, 1, 1, 1, 1, 2, 1, 2 ] + external_flags = [ False, False, False, True, False, True, False, False ] + core_points = { 5555, 0, 66, 21, 43, 11 } # All internal points should be marked core + expected_res = self.build_expected_dbscan_leaf_output(ret_ids, cluster_ids, external_flags, + core_points) + for output_rec in res: + if output_rec.leaf_id == output_rec.dist_id: + self.assertEqual(output_rec, expected_res[output_rec.id]['internal']) + else: + self.assertEqual(output_rec, expected_res[output_rec.id]['external'].pop()) + + self.assertEqual(len(res), len(ret_ids), + "All {} expected rows match rows returned by dbscan_leaf(), but also got {} unexpected rows!". + format(len(ret_ids), len(res))) + + eps=5.5 # Call dbscan_leaf() again with different eps + res = [] + for db_rec in db_recs: + num_internal_points = self.internal_rowcounts[db_rec.dist_id] + num_external_points = self.external_rowcounts[db_rec.dist_id] + state = self.module._dbscan_leaf(db_rec.__dict__, eps, min_samples, metric, num_internal_points, + num_external_points, SD) + res += list(state) - self.module.input_tbl_valid = Mock() - self.module.output_tbl_valid = Mock() - self.module.cols_in_tbl_valid = Mock() - self.module.is_var_valid = Mock(return_value = True) - self.module.get_expr_type = Mock(return_value = 'double precision[]') + self.assertEqual(SD, {}, "SD was not properly cleaned up after calls to dbscan_leaf() complete") - self.module._validate_dbscan(**self.default_dict) + # Expected results for eps=5.5, min_samples=1 + ret_ids = [ 5555, 0, 66, 66, 21, 21, 21, 43, 43, 11, 11 ] + cluster_ids = [ 2, 1, 1, 1, 1, 2, 1, 1, 2, 2, 1 ] + external_flags = [ False, False, False, True, False, True, True, False, True, False, True ] + core_points = { 5555, 0, 66, 21, 43, 11 } + expected_res = self.build_expected_dbscan_leaf_output(ret_ids, cluster_ids, external_flags, + core_points) + for output_rec in res: + if output_rec.leaf_id == output_rec.dist_id: + self.assertEqual(output_rec, expected_res[output_rec.id]['internal']) + else: + self.assertEqual(output_rec, expected_res[output_rec.id]['external'].pop()) if __name__ == '__main__': unittest.main()