This is an automated email from the ASF dual-hosted git repository.

domino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit dfbd29d0c6e21abeb9eef6a0968bd8fbc58b37ba
Author: Domino Valdano <dvald...@vmware.com>
AuthorDate: Fri Aug 7 15:44:10 2020 -0700

    DBSCAN: Fast parallel-optimized DBSCAN
    
    This optimized version of DBSCAN offers a dramatic improvment over
    the original brute force implementation.  It should have roughly
    roughly O(N/S log N) runtimes, where N is the size of the input
    dataset (number of points/rows) and S is the number of segments used
    (note:  the algorithm now decides on its own how many segments to use,
    as sometimes splitting things up further will only increase the runtime.
    Therefore S is not necessarily the total number of segments in the
    cluster, it may be less depending on the structure of the dataset.)
    
    This borrows many aspects from the previous attempt (using
    an overlapping spatial binary tree to segment the dataset and
    using an R tree index to speed up range queries), but differs in
    its approach in other ways.
    
    The brute force DBSCAN runs on N^2 time. To improve this,
    we split the data into different overlapping regions, running
    DBSCAN on each in parallel, then merging the results together
    on the coordinator. More specifically:
    
    1.  The data is segmented into connected spatial regions, using an
        in-house designed binary tree optimized specifically for DBSCAN.
        This custom DBSCAN-optimized tree is similar to a kd-tree, but tries to 
simultaneously
        keep the child nodes of each node as balanced as possible while 
splitting along a
        hyperplane which favors passing through the least dense regions of the
        space. To accomplish this, it constructs a course-grained density map
        of each node before deciding where to split, minimizing a loss function 
which
        tries to estimate the longest expected runtime of any segment assigned 
to
        the descendants of that node.
    
    2.  Each leaf of the optimized spatial tree runs the dbscan
        algorithm on the points in its spatial region, including
        some points from other regions near its boundaries, using
        an R tree index for efficient range queries.
    
    3.  Merge the clusters found in each leaf with each other by keeping track
        of which points in neighboring leaves are within eps of a cluster's home
        leaf. Uses madilb's wcc graph module (weakly connected components) to 
identify
        the equivalence classes of clusters across all leaves.
    
    ================ Partial List of Detailed Improvements ==============
    
    - Initial build_kdtree() re-write
    
        Previously, we were including a lot of points that were nowhere
        near the border as border points.  The new version should only
        include those which are actually within epsilon distance of a border.
    
        It also outputs the information in a different format that
        provides a bit more information, and will make a fast merge of
        clusters from different segments possible in a later stage.
    
        The output of build_kdtree (renamed to build_optimized_tree) is now
        a single table of annotated points instead of many tables.  The 
__leaf_id__
        column has been split into two similar concepts, __leaf_id__ and
        __dist_id__.  __leaf_id__ tells you which leaf a point is in, and
        __dist_id__ tells you which (extended) leaf a row is in--which
        determines the segment on which it will be stored.
    
        Note that each point can have several entries in the segmented_output
        table.  There is not a 1-to-1 correspondence between points and rows,
        since we store each border point in multiple overlapping leaves
        (segments) at once.
    
    - Convert find_core_points() UDA to dbscan_leaf() UDF
    
    - Add dbscan_record class, for managing input and output of dbscan_leaf
       ( maps to dbscan_record SQL TYPE, but with some hidden fields added for
       keeping track of record-specific variables in python; these are
       automatically removed before yielding the final result, when the
       record gets cloned via the copy constructor )
    
    - Allow passing id_column as an expression:
           id_column input param is now handled the same as we do the
        point column, so that any expression evaluating to an integer type
        should work instead of requiring the id to be just a single column.
        The name of the column in the output table will now always be 'id',
        but the actual expression used on the original input table is still
        stored in the summary table.
    
    - Use sqrt(eps) only for kd_tree, raw eps for dbscan_leaf
        The second one compares a squared distance to a squared distance
        metric, while for generating the kd tree we need an actual distance
    
    - Further re-write of build_kd_tree():
    
          It was taking 30s to generate the overlapping kd-tree for 10,000 
points,
        where we were doing many long queries, iteratively in python.  And
        several minutes for 100,000 points.  But this should be the fast part!
    
          Now, it will generate the non-overlapping kd-tree first, then add
        the external points for the overlap regions.  The generation of
        the regular kd-tree, along with cutoff table, is working now and
        very fast (< 1s for 10,000 points, < 5s for 100,000 points).
        It's just a single 0.5s recursive query for generating the segmented
        source (without external point augmentation), and an even simpler 0.1s
        query afterwards for the cutoffs.
           I don't expect adding the overlap points will take too much time
        either, as we already have the non-overlapping cutoffs.  We can
        generate a leaf bounds table from that (min & max boundary points
        for each leaf) without even touching the source table, and should
        be able to join that to the source table in a final query which
        scans through N/S * max_depth rows per segment (similar to the
        first two fast queries).
    
    -  Adds print_timings debug statements, for detailed performance profiling
    
    -  Changes __dist_id__ to dist_id and add new __dist_id__;
       This adds another layer of abstraction to the mapping from
       leaves to dist_id's to segments, but was needed to ensure that
       the leaves are balanced among the physical cluster segments...
       otherwise, for a small number of leaves the dist_ids map somewhat
       randomly to the segments, and several can end up on the same segment
       while other segments handle no data at all.  This will always
       place leaves on the segments in a round-robin fashion rather than
       based on gpdb's internal DISTRIBUTED BY hash function
    
    -  Move dbscan_optmized into its own class
    
    -  Fix use of wrong eps in build_kd_tree (there were cases where we
       were using eps instead of eps^2 for the squared-dist-norm).
    
    -  Add functions for optimized tree segmentation (uses loss function
       instead of kdtree for making cuts)
    
    -  Skip missing dist_id's while creating dist_map:
    
       If all the dist_id's are sequential with no gaps, as with the kd-tree
       then this wouldn't be necessary.  But now, since some branches of
       the tree stop splitting before others, we need to account for these
       missing dist_id's... otherwise you can end up with several dist_id's
       on the same segment and other segments with nothing.
    
    - Accelerates range_query() using vectorized numpy operations
    
    - Move many functions into a self-contained DBSCANStorage class
    
      This makes things much easier to manage, less params to pass around
    
    - Early removal of points from rtree, ony storing minimum necessary
    
        Delete each point from rtree as soon as the first range query is run
        on it, to speed up range queries run on neighboring points.  The smaller
        the tree, the faster the results get returned.  (Searching the tree
        for a single point is log N, but a range query returns many points... so
        it's at *least* O(log N + k) and possibly more like O(k log N), where k
        is the number of candidate neighbors returned, which we have to go
        through one at a time to check if they are actually in range)
    
        Deleting points as soon as we label them without any further
        modifications would throw off the calculation of is_core_point.
        We can still delete them early, but to ensure is_core_point is
        right we need to keep track of the number of neighbors each internal
        point has.  Some are internal and others are internal.  The sum of
        both determines the final value of is_core_point.
    
        After doing this, a lot of things we were previously doing no longer
        made sense.  Since we need to query external neighbors of the
        internal points anyway, we may as well do all of the inverse-queries
        up front instead (searching for internal neighbors of each external
        point).  In general, external points are expected to be fewer than
        internal points so hopefully this will be faster.  But then we have
        no more use for the cluster-specific trees (unless we want to add
        them back for use by the internal points?)  range_query no longer
        has min_results or max_results params
    
    - All examples passing, one each for 3D, 4D, 9D, 16D, 25D, 36D, 49D,
        and 64D, tested both on 3 segment cluster and 16 segment cluster
    
    - Change default depth to num_segs instead of log_2(num_segs)
    
        log_2(num_segs) made sense for the kd_tree, because each node of the 
tree
        is always split exactly in half, resulting in a balanced binary tree.
        With the optimized tree algorithm, splits can happen anywhere, there can
        be 1 segment on the left and 50 on the right, and then later down the
        tree the 50 get split up more while the 1 does not.  In the worst case,
        one node gets split off from the rest at each cut, meaning in order to
        populate num_segs leaves you need num_segs cuts.  This could result in
        a longer optimzation time, but the optimization phase is usually very
        fast compared to the actual parallel-DBSCAN phase that follows.
        It should only matter for small datasets, and the user always has the
        option of setting a lower max_depth than the default if optimization
        is taking too much time.
    
    - Cast input to DOUBLE PRECISION[] in source_view
    
        (This is necessary to be able to handle input tables where the points
         column is REAL[], INTEGER[], etc.
    
    - Rename depth -> max_segmentation_depth
    - Rename "kd_tree" method option to "optimized"
    - Remove unused kd_tree code
    
    - Fix a subtle issue involving early removal of border points from rtree:
    
        If range_query is called on a point in the outer for loop (where we're
        looking for a new cluster seed, not growing a cluster yet), then it 
will be
        tentatively labelled as NOISE_POINT... but may be assigned to a cluster
        later, if it borders a core point while we're growing a cluster.
        Instead of keeping these points in the rtree, we go ahead and delete 
them
        (same as for core points or noise points, which we know won't need
        further processing) but must keep track of a list of 
_possible_border_points
        in each internal db_rec.  During update_neighbors (called by 
range_query),
        we can also add the id to each of its unlabelled neighbors'
        _possible_border_points list, in case they turn out to be core points 
later.
        (None of them will be labelled as core points yet, since anything 
marked as
        a core point has already been removed from the tree; but there may be 
some
        NOISE_POINT's; we skip the neighboring NOISE_POINT's since we know they
        won't ever be core points.)
    
    - Add scripts for creating blobs for perf testing
    - Add scripts for generating and running dbscan perf tests
---
 .gitignore                                         |    1 +
 src/ports/postgres/modules/dbscan/dbscan.py_in     | 1696 +++++++++++++++-----
 src/ports/postgres/modules/dbscan/dbscan.sql_in    |  379 +++--
 .../postgres/modules/dbscan/test/dbscan.sql_in     |  142 +-
 .../dbscan/test/unit_tests/test_dbscan.py_in       |  348 +++-
 5 files changed, 1971 insertions(+), 595 deletions(-)

diff --git a/.gitignore b/.gitignore
index ad9a296..2cbb50b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
 # Ignore build directory
 /build*
 /build-docker*
+/src/ports/postgres/modules/dbscan/test/perf/blobs
 
 # Ignore generated code files
 *.so
diff --git a/src/ports/postgres/modules/dbscan/dbscan.py_in 
b/src/ports/postgres/modules/dbscan/dbscan.py_in
index 40e1205..00986c5 100644
--- a/src/ports/postgres/modules/dbscan/dbscan.py_in
+++ b/src/ports/postgres/modules/dbscan/dbscan.py_in
@@ -23,7 +23,7 @@ from utilities.control import MinWarning
 from utilities.utilities import _assert
 from utilities.utilities import unique_string
 from utilities.utilities import add_postfix
-from utilities.utilities import NUMERIC, ONLY_ARRAY
+from utilities.utilities import INTEGER, NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
 from utilities.utilities import num_features
@@ -36,10 +36,16 @@ from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
 from math import log
-from math import floor
+from math import ceil
 from math import sqrt
+from time import time
+import numpy as np
+from collections import deque
 
-from scipy.spatial import distance
+import utilities.debug as DEBUG
+DEBUG.plpy_info_enabled = False
+DEBUG.plpy_execute_enabled = False
+DEBUG.timings_enabled = False
 
 try:
     from rtree import index
@@ -48,323 +54,219 @@ except ImportError:
 else:
     RTREE_ENABLED=1
 
-BRUTE_FORCE = 'brute_force'
-KD_TREE = 'kd_tree'
+METHOD_BRUTE_FORCE = 'brute_force'
+METHOD_OPTIMIZED = 'optimized'
 DEFAULT_MIN_SAMPLES = 5
-DEFAULT_KD_DEPTH = 3
 DEFAULT_METRIC = 'squared_dist_norm2'
 
 def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
-           eps, min_samples, metric, algorithm, depth, **kwargs):
+           eps, min_samples, metric, algorithm, max_segmentation_depth, 
**kwargs):
 
     with MinWarning("warning"):
-
+        # algorithm=None is handled in get_algorithm_name()
         min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
         metric = DEFAULT_METRIC if not metric else metric
-        algorithm = BRUTE_FORCE if not algorithm else algorithm
-        depth = DEFAULT_KD_DEPTH if not depth else depth
+        num_segs = get_seg_number()
+
+        algorithm = get_algorithm_name(algorithm, METHOD_OPTIMIZED,
+            [METHOD_BRUTE_FORCE, METHOD_OPTIMIZED], 'DBSCAN')
 
-        algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
-            [BRUTE_FORCE, KD_TREE], 'DBSCAN')
+        if max_segmentation_depth is None:
+            # Default to num_segs
+            max_depth = num_segs
+        else:
+            max_depth = max_segmentation_depth
+            if algorithm != METHOD_OPTIMIZED:
+                plpy.warning("Ignoring max_segmentation_depth={} param, "
+                          "N/A to algorithm={}".format(max_depth, algorithm))
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm, 
depth)
+                         expr_point, eps, min_samples, metric, algorithm, 
max_depth)
 
-        dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
-        dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
-        dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
-        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
+        dist_by_src = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
+        dist_by_id = ''  if is_platform_pg() else 'DISTRIBUTED BY (id)'
+        dist_by_reach_id = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_by_seg_id = '' if is_platform_pg() else 'DISTRIBUTED BY (seg_id)'
+        distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY 
(__dist_id__)'
 
         core_points_table = unique_string(desp='core_points_table')
         core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
-            core_points_table, core_edge_table, distance_table))
 
         source_view = unique_string(desp='source_view')
         plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
         sql = """
             CREATE VIEW {source_view} AS
-            SELECT {id_column}, {expr_point} AS __expr_point__
+            SELECT ({id_column})::BIGINT AS id, ({expr_point})::DOUBLE 
PRECISION[] AS point
             FROM {source_table}
             """.format(**locals())
         plpy.execute(sql)
-        expr_point = '__expr_point__'
-
-        if algorithm == KD_TREE:
-            cur_source_table, border_table1, border_table2 = dbscan_kd(
-                schema_madlib, source_view, id_column, expr_point, eps,
-                min_samples, metric, depth)
-
-            sql = """
-                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY 
__leaf_id__
-                """.format(**locals())
-            result = plpy.execute(sql)
-            rt_counts_dict = {}
-            for i in result:
-                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
-            rt_counts_list = []
-            for i in sorted(rt_counts_dict):
-                rt_counts_list.append(rt_counts_dict[i])
-
-            find_core_points_table = 
unique_string(desp='find_core_points_table')
-            rt_edge_table = unique_string(desp='rt_edge_table')
-            rt_core_points_table = unique_string(desp='rt_core_points_table')
-            border_core_points_table = 
unique_string(desp='border_core_points_table')
-            border_edge_table = unique_string(desp='border_edge_table')
-            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
-                find_core_points_table, rt_edge_table, rt_core_points_table,
-                border_core_points_table, border_edge_table))
-
-            sql = """
-            CREATE TABLE {find_core_points_table} AS
-            SELECT __leaf_id__,
-                   {schema_madlib}.find_core_points( {id_column},
-                                               {expr_point}::DOUBLE 
PRECISION[],
-                                               {eps},
-                                               {min_samples},
-                                               '{metric}',
-                                               ARRAY{rt_counts_list},
-                                               __leaf_id__
-                                               )
-            FROM {cur_source_table} GROUP BY __leaf_id__
-            {dist_leaf_sql}
-            """.format(**locals())
-            plpy.execute(sql)
-
-            sql = """
-            CREATE TABLE {rt_edge_table} AS
-            SELECT (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__
-            FROM (
-                SELECT {schema_madlib}.unpack_2d(find_core_points) AS 
unpacked_2d
-                FROM {find_core_points_table}
-                ) q1
-            WHERE (unpacked_2d).src NOT IN (SELECT {id_column} FROM 
{border_table1})
-            {dist_src_sql}
-            """.format(**locals())
-            plpy.execute(sql)
-
-            sql = """
-                CREATE TABLE {rt_core_points_table} AS
-                SELECT DISTINCT(__src__) AS {id_column} FROM {rt_edge_table}
-                """.format(**locals())
-            plpy.execute(sql)
-
-            # # Start border
-            sql = """
-                CREATE TABLE {border_edge_table} AS
-                SELECT __src__, __dest__ FROM (
-                    SELECT  __t1__.{id_column} AS __src__,
-                            __t2__.{id_column} AS __dest__,
-                            {schema_madlib}.{metric}(
-                                __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
-                    FROM {border_table1} AS __t1__, {border_table2} AS 
__t2__)q1
-                WHERE __dist__ < {eps}
-                """.format(**locals())
-            plpy.execute(sql)
-
-            sql = """
-                CREATE TABLE {border_core_points_table} AS
-                SELECT * FROM (
-                    SELECT __src__ AS {id_column}, count(*) AS __count__
-                    FROM {border_edge_table} GROUP BY __src__) q1
-                WHERE __count__ >= {min_samples}
-                {dist_id_sql}
-                """.format(**locals())
-            plpy.execute(sql)
-
-            # Build common tables
-            sql = """
-                CREATE TABLE {distance_table} AS
-                SELECT * FROM {rt_edge_table}
-                UNION
-                SELECT * FROM {border_edge_table}
-                """.format(**locals())
-            plpy.execute(sql)
-            sql = """
-                CREATE TABLE {core_points_table} AS
-                SELECT {id_column} FROM {border_core_points_table}
-                UNION
-                SELECT {id_column} FROM {rt_core_points_table}
-                """.format(**locals())
-            plpy.execute(sql)
-
-            sql = """
-                CREATE TABLE {core_edge_table} AS
-                SELECT __t1__.__src__, __t1__.__dest__
-                FROM {rt_edge_table} __t1__ ,
-                    (SELECT array_agg({id_column}) AS arr
-                     FROM {core_points_table}) __t2__
-                WHERE  __t1__.__dest__ = ANY(arr)
-                UNION
-                SELECT __t1__.__src__, __t1__.__dest__
-                FROM {border_edge_table} __t1__ ,
-                    (SELECT array_agg({id_column}) AS arr
-                     FROM {core_points_table}) __t2__
-                WHERE  __t1__.__src__ = ANY(arr) AND __t1__.__dest__ = ANY(arr)
-                """.format(**locals())
-            plpy.execute(sql)
-
-            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}, 
{6}, {7}".format(
-                find_core_points_table, rt_edge_table, rt_core_points_table,
-                border_core_points_table, border_edge_table,
-                cur_source_table, border_table1, border_table2))
 
+        if algorithm == METHOD_OPTIMIZED:
+            dbscan_opt = DBSCAN_optimized(schema_madlib, source_view, 
output_table,
+                                          eps, min_samples, metric, max_depth)
+            dbscan_opt.run()
         else:
-
             # Calculate pairwise distances
             sql = """
-                CREATE TABLE {distance_table} AS
+                CREATE TEMP TABLE {distance_table} AS
                 SELECT __src__, __dest__ FROM (
-                    SELECT  __t1__.{id_column} AS __src__,
-                            __t2__.{id_column} AS __dest__,
+                    SELECT  __t1__.id AS __src__,
+                            __t2__.id AS __dest__,
                             {schema_madlib}.{metric}(
-                                __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
+                                __t1__.point, __t2__.point) AS __dist__
                     FROM {source_view} AS __t1__, {source_view} AS __t2__)q1
                 WHERE __dist__ < {eps}
-                {dist_src_sql}
+                {dist_by_src}
                 """.format(**locals())
             plpy.execute(sql)
 
             # Find core points
             sql = """
-                CREATE TABLE {core_points_table} AS
+                CREATE TEMP TABLE {core_points_table} AS
                 SELECT * FROM (
-                    SELECT __src__ AS {id_column}, count(*) AS __count__
+                    SELECT __src__ AS id, count(*) AS __count__
                     FROM {distance_table} GROUP BY __src__) q1
                 WHERE __count__ >= {min_samples}
-                {dist_id_sql}
+                {dist_by_id}
                 """.format(**locals())
             plpy.execute(sql)
 
             # Find the connections between core points to form the clusters
             sql = """
-                CREATE TABLE {core_edge_table} AS
+                CREATE TEMP TABLE {core_edge_table} AS
                 SELECT __src__, __dest__
-                FROM {distance_table} AS __t1__, (SELECT 
array_agg({id_column}) AS arr
+                FROM {distance_table} AS __t1__, (SELECT array_agg(id) AS arr
                                                   FROM {core_points_table}) 
__t2__
                 WHERE __t1__.__src__ = ANY(arr) AND __t1__.__dest__ = ANY(arr)
-                {dist_src_sql}
+                {dist_by_src}
             """.format(**locals())
             plpy.execute(sql)
 
-        sql = """
-            SELECT count(*) FROM {core_points_table}
-            """.format(**locals())
-        core_count = plpy.execute(sql)[0]['count']
-        _assert(core_count != 0, "DBSCAN: Cannot find any core 
points/clusters.")
-
-        # Start snowflake creation
-        if is_platform_pg():
             sql = """
-                SELECT count(*) FROM {core_edge_table}
+                SELECT count(*) FROM {core_points_table}
                 """.format(**locals())
-            count = plpy.execute(sql)[0]['count']
+            core_count = plpy.execute(sql)[0]['count']
+            _assert(core_count != 0, "DBSCAN: Cannot find any core 
points/clusters.")
 
-            counts_list = [int(count)]
-            seg_id = 0
-            group_by_clause = ''
-            dist_clause = ''
+            # Start snowflake creation
+            if is_platform_pg():
+                sql = """
+                    SELECT count(*) FROM {core_edge_table}
+                    """.format(**locals())
+                count = plpy.execute(sql)[0]['count']
 
-        else:
-            sql = """
-                SELECT count(*), gp_segment_id FROM {core_edge_table} GROUP BY 
gp_segment_id
-                """.format(**locals())
-            count_result = plpy.execute(sql)
-            seg_num = get_seg_number()
-            counts_list = [0]*seg_num
-            for i in count_result:
-                counts_list[int(i['gp_segment_id'])] = int(i['count'])
-            seg_id = 'gp_segment_id'
-            group_by_clause = 'GROUP BY gp_segment_id'
-            dist_clause = 'DISTRIBUTED BY (seg_id)'
-
-        snowflake_table = unique_string(desp='snowflake_table')
-        sf_edge_table = unique_string(desp='sf_edge_table')
-
-        plpy.execute("DROP TABLE IF EXISTS {0}, {1}".format(
-            snowflake_table, sf_edge_table))
-        sql = """
-            CREATE TABLE {snowflake_table} AS
-            SELECT {seg_id}::INTEGER AS seg_id,
-                   {schema_madlib}.build_snowflake_table( __src__,
-                                            __dest__,
-                                            ARRAY{counts_list},
-                                            {seg_id}
-                                           ) AS __sf__
-            FROM {core_edge_table} {group_by_clause}
-            {dist_clause}
-        """.format(**locals())
-        plpy.execute(sql)
+                counts_list = [int(count)]
+                seg_id = 0
+                group_by_clause = ''
 
-        sql = """
-            CREATE TABLE {sf_edge_table} AS
-            SELECT seg_id, (unpacked_2d).src AS __src__, (unpacked_2d).dest AS 
__dest__
-            FROM (
-                SELECT seg_id,
-                       {schema_madlib}.unpack_2d(__sf__) AS unpacked_2d
-                FROM {snowflake_table}
-                ) q1
-            {dist_clause}
+            else:
+                sql = """
+                    SELECT count(*), gp_segment_id FROM {core_edge_table} 
GROUP BY gp_segment_id
+                    """.format(**locals())
+                count_result = plpy.execute(sql)
+                seg_num = get_seg_number()
+                counts_list = [0]*seg_num
+                for i in count_result:
+                    counts_list[int(i['gp_segment_id'])] = int(i['count'])
+                seg_id = 'gp_segment_id'
+                group_by_clause = 'GROUP BY gp_segment_id'
+
+            snowflake_table = unique_string(desp='snowflake_table')
+            sf_edge_table = unique_string(desp='sf_edge_table')
+
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}".format(
+                snowflake_table, sf_edge_table))
+            sql = """
+                CREATE TEMP TABLE {snowflake_table} AS
+                SELECT {seg_id}::INTEGER AS seg_id,
+                       {schema_madlib}.build_snowflake_table( __src__,
+                                                __dest__,
+                                                ARRAY{counts_list},
+                                                {seg_id}
+                                               ) AS __sf__
+                FROM {core_edge_table} {group_by_clause}
+                {dist_by_seg_id}
             """.format(**locals())
-        plpy.execute(sql)
+            plpy.execute(sql)
 
-        # Run wcc to get the min id for each cluster
-        wcc(schema_madlib, core_points_table, id_column, sf_edge_table,
-            'src=__src__, dest=__dest__', output_table, None)
+            sql = """
+                CREATE TEMP TABLE {sf_edge_table} AS
+                SELECT seg_id, (edge).src AS __src__, (edge).dest AS __dest__
+                FROM (
+                    SELECT seg_id,
+                           {schema_madlib}.__dbscan_unpack_edge(__sf__) AS edge
+                    FROM {snowflake_table}
+                    ) q1
+                {dist_by_seg_id}
+                """.format(**locals())
+            plpy.execute(sql)
 
-        plpy.execute("""
-            ALTER TABLE {0}
-            ADD COLUMN is_core_point BOOLEAN,
-            ADD COLUMN __points__ DOUBLE PRECISION[]
-            """.format(output_table))
-        plpy.execute("""
-            ALTER TABLE {0}
-            RENAME COLUMN component_id TO cluster_id
-            """.format(output_table))
-        plpy.execute("""
-            UPDATE {0}
-            SET is_core_point = TRUE
-        """.format(output_table))
+            # Run wcc to get the min id for each cluster
+            wcc(schema_madlib, core_points_table, 'id', sf_edge_table,
+                'src=__src__, dest=__dest__', output_table, None)
+
+            plpy.execute("""
+                ALTER TABLE {0}
+                ADD COLUMN is_core_point BOOLEAN,
+                ADD COLUMN point DOUBLE PRECISION[];
+                """.format(output_table)
+            )
+            plpy.execute("""
+                ALTER TABLE {0}
+                RENAME COLUMN component_id TO cluster_id;
+                """.format(output_table)
+            )
+            plpy.execute("""
+                UPDATE {0}
+                SET is_core_point = TRUE
+                """.format(output_table)
+            )
+
+            # Find reachable points
+            reachable_points_table = 
unique_string(desp='reachable_points_table')
+            plpy.execute("DROP TABLE IF EXISTS 
{0}".format(reachable_points_table))
+            sql = """
+                CREATE TEMP TABLE {reachable_points_table} AS
+                    SELECT array_agg(__src__) AS __src_list__,
+                           __dest__ AS __reachable_id__
+                    FROM {distance_table} AS __t1__,
+                         (SELECT array_agg(id) AS __arr__
+                          FROM {core_points_table}) __t2__
+                    WHERE __src__ = ANY(__arr__) AND __dest__ != ALL(__arr__)
+                    GROUP BY __dest__
+                    {dist_by_reach_id}
+                """.format(**locals())
+            plpy.execute(sql)
 
-        # Find reachable points
-        reachable_points_table = unique_string(desp='reachable_points_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(reachable_points_table))
-        sql = """
-            CREATE TABLE {reachable_points_table} AS
-                SELECT array_agg(__src__) AS __src_list__,
-                       __dest__ AS __reachable_id__
-                FROM {distance_table} AS __t1__,
-                     (SELECT array_agg({id_column}) AS __arr__
-                      FROM {core_points_table}) __t2__
-                WHERE __src__ = ANY(__arr__) AND __dest__ != ALL(__arr__)
-                GROUP BY __dest__
-                {dist_reach_sql}
-            """.format(**locals())
-        plpy.execute(sql)
+            sql = """
+                INSERT INTO {output_table}
+                SELECT  __reachable_id__,
+                        cluster_id,
+                        FALSE AS is_core_point,
+                        NULL AS point
+                FROM {reachable_points_table} AS __t1__ JOIN
+                     {output_table} AS __t2__
+                     ON (__src_list__[1] = id)
+                """.format(**locals())
+            plpy.execute(sql)
 
-        sql = """
-            INSERT INTO {output_table}
-            SELECT  __reachable_id__ as {id_column},
-                    cluster_id,
-                    FALSE AS is_core_point,
-                    NULL AS __points__
-            FROM {reachable_points_table} AS __t1__ INNER JOIN
-                 {output_table} AS __t2__
-                 ON (__src_list__[1] = {id_column})
+            # Add features of points to the output table to use them for 
prediction
+            sql = """
+                UPDATE {output_table} AS __t1__
+                SET point = __t2__.point
+                FROM {source_view} AS __t2__
+                WHERE __t1__.id = __t2__.id
             """.format(**locals())
-        plpy.execute(sql)
+            plpy.execute(sql)
 
-        # Add features of points to the output table to use them for prediction
-        sql = """
-            UPDATE {output_table} AS __t1__
-            SET __points__ = {expr_point}
-            FROM {source_view} AS __t2__
-            WHERE __t1__.{id_column} = __t2__.{id_column}
-        """.format(**locals())
-        plpy.execute(sql)
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, 
{5}".format(
+                             distance_table, core_points_table, 
core_edge_table,
+                             reachable_points_table, snowflake_table, 
sf_edge_table))
 
-        # Update the cluster ids to be consecutive
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
+
+        # -- Update the cluster ids to be consecutive --
         sql = """
             UPDATE {output_table} AS __t1__
             SET cluster_id = new_id-1
@@ -376,127 +278,658 @@ def dbscan(schema_madlib, source_table, output_table, 
id_column, expr_point,
         """.format(**locals())
         plpy.execute(sql)
 
+        # -- Generate output summary table ---
+
         output_summary_table = add_postfix(output_table, '_summary')
 
-        # Drop the summary table from wcc
         plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table))
         sql = """
             CREATE TABLE {output_summary_table} AS
-            SELECT  '{id_column}'::VARCHAR AS id_column,
+            SELECT  $${id_column}$$::VARCHAR AS id_column,
+                    $${expr_point}$$::VARCHAR AS expr_point,
                     {eps}::DOUBLE PRECISION AS eps,
+                    {min_samples}::BIGINT AS min_points,
                     '{metric}'::VARCHAR AS metric
             """.format(**locals())
         plpy.execute(sql)
 
-        plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
-        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, 
{5}".format(
-                     distance_table, core_points_table, core_edge_table,
-                     reachable_points_table, snowflake_table, sf_edge_table))
-
-def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps,
-              min_samples, metric, depth):
-
-    n_features = num_features(source_table, expr_point)
-
-    # If squared_dist_norm2 is used, we assume eps is set for the squared 
distance
-    # That means the border only needs to be sqrt(eps) wide
-    local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
-
-    kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree(
-        schema_madlib, source_table, expr_point, depth, n_features, local_eps)
+class DBSCAN(object):
+    def __init__(self, schema_madlib, source_table, output_table,
+                 eps, min_samples, metric):
+        """
+            Args:
+                @param schema_madlib        Name of the Madlib Schema
+                @param source_table         Training data table
+                @param output_table         Output table of points labelled by 
cluster_id
+                @param eps                  The eps value defined by the user
+                @param min_samples          min_samples defined by user
+                @param metric               metric selected by user
+
+        """
+        self.schema_madlib = schema_madlib
+        self.source_table = source_table
+        self.output_table = output_table
+        self.n_dims = num_features(source_table, 'point')
+        self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY 
(__dist_id__)'
+        self.min_samples = min_samples
+        self.metric = metric
+        # If squared_dist_norm2 is used, we assume eps is set for the squared
+        # distance.  That means the border width must be sqrt(eps)
+        self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
+
+class DBSCAN_optimized(DBSCAN):
+    def __init__(self, schema_madlib, source_table, output_table,
+                 eps, min_samples, metric, max_depth=None):
+        """
+            Args:
+                @param schema_madlib        Name of the Madlib Schema
+                @param source_table         Training data table
+                @param output_table         Output table of points labelled by 
cluster_id
+                @param eps                  The eps value defined by the user
+                @param min_samples          min_samples defined by user
+                @param metric               metric selected by user
+        """
+
+        DBSCAN.__init__(self, schema_madlib, source_table, output_table,
+                        eps, min_samples, metric)
+        self.max_depth = max_depth
+        self.unique_str = unique_string('DBSCAN_opt')
+        self.num_segs = get_seg_number()
+        res = plpy.execute("SELECT COUNT(*) FROM 
{source_table}".format(**locals()))
+        self.num_points = res[0]['count']
+
+    def run(self):
+        unique_str = self.unique_str
+        distributed_by = self.distributed_by
+        schema_madlib = self.schema_madlib
+
+        self.dist_map = 'dist_map' + unique_str
+
+        segmented_source = 'segmented_source' + unique_str
+        self.segmented_source = segmented_source
+        self.build_optimized_tree()
+
+        rowcount_table = 'rowcounts' + unique_str
+        leaf_clusters_table = 'leaf_clusters' + unique_str
+
+        count_rows_sql = """
+            DROP TABLE IF EXISTS {rowcount_table};
+            CREATE TEMP TABLE {rowcount_table} AS
+            WITH i AS (
+                SELECT
+                    count(*) AS num_rows,
+                    dist_id,
+                    __dist_id__
+                FROM internal_{segmented_source}
+                GROUP BY dist_id, __dist_id__
+            ), e AS (
+                SELECT
+                    count(*) AS num_rows,
+                    dist_id,
+                    __dist_id__
+                FROM external_{segmented_source}
+                GROUP BY dist_id, __dist_id__
+            )
+            SELECT
+                i.num_rows AS num_internal_rows,
+                COALESCE(e.num_rows,0) AS num_external_rows,
+                i.dist_id,
+                __dist_id__
+            FROM i LEFT JOIN e
+                USING (dist_id, __dist_id__)
+            {distributed_by}
+            """.format(**locals())
+        plpy.execute(count_rows_sql)
+
+        # Run dbscan in parallel on each leaf
+        dbscan_leaves_sql = """
+            CREATE TEMP TABLE {leaf_clusters_table} AS
+            WITH input AS (
+                    SELECT *
+                    FROM internal_{segmented_source}
+                UNION ALL
+                    SELECT *
+                    FROM external_{segmented_source}
+            ), segmented_source AS (
+                SELECT
+                    ROW(
+                        id,
+                        point,
+                        FALSE,
+                        0,
+                        leaf_id,
+                        dist_id
+                    )::{schema_madlib}.__dbscan_record AS rec,
+                    __dist_id__
+                FROM input
+            )
+            SELECT (output).*, __dist_id__ FROM (
+                SELECT
+                    {schema_madlib}.__dbscan_leaf(
+                                                  rec,
+                                                  {self.eps},
+                                                  {self.min_samples},
+                                                  '{self.metric}',
+                                                  num_internal_rows,
+                                                  num_external_rows
+                                                 ) AS output,
+                    __dist_id__
+                FROM segmented_source JOIN {rowcount_table}
+                    USING (__dist_id__)
+            ) a {distributed_by}
+        """.format(**locals())
+        DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True)
 
-    kd_source_table = unique_string(desp='kd_source_table')
-    kd_border_table1 = unique_string(desp='kd_border_table1')
-    kd_border_table2 = unique_string(desp='kd_border_table2')
+        plpy.execute("""
+            DROP TABLE IF EXISTS
+            internal_{segmented_source},
+            external_{segmented_source}
+        """.format(**locals()))
+
+        cluster_map = 'cluster_map' + unique_str
+        noise_point = label.NOISE_POINT
+
+        # Replace local cluster_id's in each leaf with
+        #   global cluster_id's which are unique across all leaves
+        gen_cluster_map_sql = """
+            CREATE TEMP TABLE {cluster_map} AS
+                SELECT
+                    dist_id,
+                    __dist_id__,
+                    cluster_id AS local_id,
+                    ROW_NUMBER() OVER(
+                        ORDER BY dist_id, cluster_id
+                    ) AS global_id
+                FROM {leaf_clusters_table}
+                    WHERE dist_id = leaf_id AND cluster_id != {noise_point}
+                GROUP BY __dist_id__, dist_id, cluster_id
+                {distributed_by}
+        """.format(**locals())
+        plpy.execute(gen_cluster_map_sql)
+
+        globalize_cluster_ids_sql = """
+            UPDATE {leaf_clusters_table} lc
+                SET cluster_id = global_id
+            FROM {cluster_map} cm WHERE
+                cm.dist_id = lc.dist_id AND
+                cluster_id = local_id;
+        """.format(**locals())
+        DEBUG.plpy.execute(globalize_cluster_ids_sql)
+
+        intercluster_edges = 'intercluster_edges' + unique_str
+        dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)'
+
+        # Build intercluster table of edges connecting clusters
+        #  on different leaves
+        intercluster_edge_sql = """
+            CREATE TEMP TABLE {intercluster_edges} AS
+                SELECT
+                    internal.cluster_id AS src,
+                    external.cluster_id AS dest
+                FROM {leaf_clusters_table} internal
+                JOIN {leaf_clusters_table} external
+                    USING (id)
+                WHERE internal.is_core_point
+                    AND internal.leaf_id  = internal.dist_id
+                    AND external.leaf_id != external.dist_id
+                GROUP BY src,dest
+                {dist_by_src}
+        """.format(**locals())
+        DEBUG.plpy.execute(intercluster_edge_sql)
+
+        res = plpy.execute("SELECT count(*) FROM 
{intercluster_edges}".format(**locals()))
+
+        if int(res[0]['count']) > 0:
+            wcc_table = 'wcc' + unique_str
+            plpy.execute("""
+            DROP TABLE IF EXISTS
+                {wcc_table}, {wcc_table}_summary,
+                {self.output_table}
+            """.format(**locals()))
+
+            # Find connected components of intercluster_edge_table
+            wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges,
+            'src=src,dest=dest', wcc_table, None)
+
+           # Rename each cluster_id to its component id in the intercluster 
graph
+            merge_clusters_sql = """
+            UPDATE {leaf_clusters_table} lc
+                SET cluster_id = wcc.component_id
+            FROM {wcc_table} wcc
+                WHERE lc.cluster_id = wcc.global_id
+            """.format(**locals())
+            plpy.execute(merge_clusters_sql)
 
-    dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)'
-    plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table, 
kd_border_table1, kd_border_table2))
+            plpy.execute("""
+            DROP TABLE IF EXISTS
+                {wcc_table}, {wcc_table}_summary
+            """.format(**locals()))
 
-    output_sql = """
-        CREATE TABLE {kd_source_table} AS
-            SELECT *,
-                   CASE {case_when_clause} END AS __leaf_id__
-            FROM {source_table}
-            {dist_leaf_sql}
+        add_crossborder_points_sql = """
+            CREATE TABLE {self.output_table} AS
+            SELECT
+                id, point, cluster_id, is_core_point
+            FROM (
+                SELECT
+                    id,
+                    point,
+                    is_core_point,
+                    leaf_id = dist_id AS is_internal,
+                    dist_id,
+                    CASE WHEN cluster_id = {noise_point}
+                        THEN MAX(cluster_id) OVER(PARTITION BY id)
+                        ELSE cluster_id
+                    END
+                FROM {leaf_clusters_table}
+            ) x WHERE is_internal AND cluster_id != {noise_point}
         """.format(**locals())
-    plpy.execute(output_sql)
+        DEBUG.plpy.execute(add_crossborder_points_sql)
 
-    border_sql = """
-        CREATE TABLE {kd_border_table1} AS
-            SELECT *
-            FROM {source_table}
-            WHERE {border_cl1}
+        plpy.execute("""
+            DROP TABLE IF EXISTS {rowcount_table},
+            {leaf_clusters_table}, {cluster_map},
+            {intercluster_edges}
+        """.format(**locals()))
+
+    def build_optimized_tree(self):
+        eps = self.eps
+        num_segs = self.num_segs
+        num_points = self.num_points
+        target = num_points / num_segs
+        source_table = self.source_table
+        leaf_bounds_table = 'leaf_bounds' + self.unique_str
+        max_depth = self.max_depth
+        n_dims = self.n_dims
+        dist_map = self.dist_map
+        distributed_by = self.distributed_by
+        internal_points = 'internal_' + self.segmented_source
+        external_points = 'external_' + self.segmented_source
+
+        next_cut = 'next_cut' + self.unique_str
+        prev_node = 'prev_node' + self.unique_str
+
+        first_cut_sql = """
+            DROP TABLE IF EXISTS {prev_node};
+            CREATE TEMP TABLE {prev_node} AS
+            WITH world_bounds AS (
+                SELECT
+                    i,
+                    min(point[i]) AS lower,
+                    max(point[i])+{eps} AS upper,
+                    max(point[i])+{eps} - min(point[i]) AS size,
+                    floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT 
AS eps_bins
+                FROM generate_series(1,{n_dims}) AS i, {source_table}
+                GROUP BY i
+            ),
+            density_map AS (
+                SELECT i, eps_bin,
+                    eps_bins,
+                    COALESCE(density, 0) AS density
+                FROM (
+                    SELECT i, eps_bins,
+                        generate_series(0, eps_bins - 1) AS eps_bin
+                    FROM world_bounds
+                ) g LEFT JOIN (
+                    SELECT i,
+                        floor((point[i] - lower)/{eps})::INT AS eps_bin,
+                        eps_bins,
+                        COUNT(*) AS density
+                    FROM
+                        world_bounds,
+                        {source_table}
+                    GROUP BY i, eps_bin, eps_bins
+                ) a
+                USING (i, eps_bin, eps_bins)
+            ),
+            loss_table AS (
+                SELECT i, eps_bin,
+                    left_internal,
+                    {num_points} as points_in_node,
+                    left_external,
+                    right_external,
+                    {self.schema_madlib}.__dbscan_segmentation_loss(
+                        left_internal,
+                        {num_points} - left_internal,
+                        left_external,
+                        right_external,
+                        {num_segs}::BIGINT,
+                        V,
+                        {eps}::DOUBLE PRECISION,
+                        {n_dims}::BIGINT,
+                        eps_bin::DOUBLE PRECISION,
+                        eps_bins::DOUBLE PRECISION
+                    ) AS losses
+                FROM (
+                    SELECT i, eps_bin, eps_bins,
+                        {num_segs}::BIGINT AS num_segs,
+                        COALESCE(density, 0) AS right_external,
+                        COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY 
eps_bin), 0) AS left_external,
+                        SUM(density) OVER(PARTITION BY i ORDER BY 
eps_bin)::BIGINT AS left_internal
+                    FROM (
+                        SELECT i, generate_series(0, eps_bins - 1) AS eps_bin 
FROM world_bounds
+                    ) g LEFT JOIN density_map USING(i, eps_bin)
+                ) params,
+                (
+                    SELECT {self.schema_madlib}.prod(size) AS V
+                    FROM world_bounds
+                ) AS volume
+            ),
+            optimize AS (
+                SELECT i, lower, cutoff, upper,
+                       left_avail, right_avail,
+                       left_internal, right_internal
+                FROM (
+                    SELECT
+                        i,
+                       CASE WHEN (losses).right_avail = 0
+                          THEN
+                               points_in_node
+                           ELSE
+                               left_internal
+                        END AS left_internal,
+                       CASE WHEN (losses).right_avail = 0
+                          THEN
+                               0::BIGINT
+                           ELSE
+                               points_in_node - left_internal
+                        END AS right_internal,
+                        (losses).left_avail AS left_avail,
+                        (losses).right_avail AS right_avail,
+                        GREATEST((losses).left_loss,(losses).right_loss) AS 
total_loss,
+                        min(
+                            GREATEST((losses).left_loss,(losses).right_loss)
+                        ) OVER() AS total_min,
+                        wb.lower,
+                        CASE WHEN (losses).right_avail = 0
+                        THEN
+                            wb.upper
+                        ELSE
+                            wb.lower + {eps}*(eps_bin+1)
+                        END AS cutoff,
+                        wb.upper
+                    FROM loss_table JOIN world_bounds wb USING(i)
+                ) a WHERE total_loss = total_min LIMIT 1
+            )
+            SELECT s.id,
+                ARRAY[i] AS coords,
+                CASE WHEN s.point[i] < cutoff
+                    THEN left_avail
+                    ELSE right_avail
+                END AS avail_segs,
+                CASE WHEN point[i] < cutoff
+                    THEN left_internal
+                    ELSE right_internal
+                END AS points_in_node,
+                CASE WHEN s.point[i] < cutoff
+                    THEN ARRAY[lower]
+                    ELSE ARRAY[cutoff]
+                END AS lower_bounds,
+                CASE WHEN s.point[i] < cutoff
+                    THEN ARRAY[cutoff]
+                    ELSE ARRAY[upper]
+                END AS upper_bounds,
+                (s.point[i] >= cutoff)::INT AS node_id
+            FROM optimize, {source_table} s
         """.format(**locals())
-    plpy.execute(border_sql)
-
-    border_sql = """
-        CREATE TABLE {kd_border_table2} AS
-            SELECT *
+        plpy.execute(first_cut_sql)
+
+        segment_source_sql = """
+            CREATE TEMP TABLE {next_cut} AS
+            WITH node_bounds AS (
+                SELECT
+                    i,
+                    node_id,
+                    avail_segs,
+                    points_in_node,
+                    min(point[i]) AS lower,
+                    max(point[i])+{eps} AS upper,
+                    max(point[i])+{eps} - min(point[i]) AS size,
+                    floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT 
AS eps_bins
+                FROM generate_series(1, {n_dims}) AS i,
+                    {prev_node} JOIN {source_table} USING (id)
+                GROUP BY 1,2,3,4
+            ), density_map AS (
+                SELECT i, node_id, eps_bin,
+                    COALESCE( density, 0) AS density
+                FROM (
+                    SELECT node_id, i,
+                        generate_series(0, eps_bins - 1) AS eps_bin
+                    FROM node_bounds
+                ) g LEFT JOIN (
+                    SELECT node_id, i,
+                        floor((point[i] - lower)/{eps})::INT AS eps_bin,
+                        COUNT(*) AS density
+                    FROM
+                        node_bounds JOIN {prev_node} USING (node_id)
+                        JOIN {source_table} USING (id)
+                    GROUP BY node_id, i, eps_bin
+                ) a
+                USING(i, node_id, eps_bin)
+            ), params AS (
+                SELECT i, node_id, eps_bin, eps_bins,
+                    COALESCE(density, 0) AS right_external,
+                    COALESCE(LEAD(density) OVER(PARTITION BY node_id,i ORDER 
BY eps_bin), 0) AS left_external,
+                    SUM(density) OVER(PARTITION BY node_id,i ORDER BY 
eps_bin)::BIGINT AS left_internal
+                FROM (
+                    SELECT i, node_id, eps_bins, generate_series(0, eps_bins - 
1) AS eps_bin FROM node_bounds
+                ) g LEFT JOIN density_map USING(i, node_id, eps_bin)
+            ), volume AS (
+                SELECT
+                    node_id,
+                    avail_segs,
+                    points_in_node,
+                    {self.schema_madlib}.prod(size) AS V
+                FROM node_bounds
+                WHERE i=1
+                GROUP BY 1,2,3
+            ), loss_table AS (
+                SELECT i, node_id, eps_bin,
+                    left_internal,
+                    points_in_node - left_internal AS right_internal,
+                    left_external,
+                    right_external,
+                    {self.schema_madlib}.__dbscan_segmentation_loss(
+                        left_internal,
+                        points_in_node,
+                        left_external,
+                        right_external,
+                        avail_segs,
+                        V,
+                        {eps}::DOUBLE PRECISION,
+                        {n_dims}::BIGINT,
+                        eps_bin::DOUBLE PRECISION,
+                        eps_bins::DOUBLE PRECISION
+                    ) AS losses
+                FROM params JOIN volume USING (node_id)
+            ), optimize AS (
+                SELECT * FROM (
+                    SELECT
+                        node_id,
+                        i,
+                        (losses).left_avail,
+                        (losses).right_avail,
+                       CASE WHEN (losses).right_avail = 0
+                          THEN
+                               points_in_node
+                           ELSE
+                               left_internal
+                        END AS left_internal,
+                       CASE WHEN (losses).right_avail = 0
+                          THEN
+                               0::BIGINT
+                           ELSE
+                               points_in_node - left_internal
+                        END AS right_internal,
+                        node_bounds.lower,
+                        CASE WHEN (losses).right_avail = 0
+                            THEN
+                                node_bounds.upper
+                            ELSE
+                                node_bounds.lower + {eps}*(eps_bin+1)
+                            END AS cutoff,
+                        node_bounds.upper,
+                        ROW_NUMBER() OVER(
+                            PARTITION BY node_id
+                            ORDER BY 
GREATEST((losses).left_loss,(losses).right_loss)
+                        ) AS loss_rank
+                    FROM loss_table JOIN node_bounds USING(i, node_id)
+                ) a WHERE loss_rank = 1
+            )
+            SELECT id, coords || i AS coords,
+                CASE WHEN point[i] < opt.cutoff
+                    THEN left_avail
+                    ELSE right_avail
+                END AS avail_segs,
+                CASE WHEN point[i] < opt.cutoff
+                    THEN left_internal
+                    ELSE right_internal
+                END AS points_in_node,
+                lower_bounds ||
+                    CASE WHEN point[i] < opt.cutoff
+                    THEN
+                        lower
+                    ELSE
+                        opt.cutoff
+                    END AS lower_bounds,
+                    opt.cutoff,
+                upper_bounds ||
+                    CASE WHEN point[i] < opt.cutoff
+                    THEN
+                        opt.cutoff
+                    ELSE
+                        upper
+                    END AS upper_bounds,
+                (node_id << 1) | (point[i] >= opt.cutoff)::INT AS node_id
             FROM {source_table}
-            WHERE {border_cl2}
+            JOIN {prev_node}
+                USING (id)
+            JOIN optimize opt
+                USING (node_id)
         """.format(**locals())
-    plpy.execute(border_sql)
 
-    return kd_source_table, kd_border_table1, kd_border_table2
+        for i in range(max_depth):
+            DEBUG.plpy.info(segment_source_sql)
+            plpy.execute(segment_source_sql)
+            plpy.execute("""
+                DROP TABLE IF EXISTS {prev_node};
+                ALTER TABLE {next_cut} RENAME TO {prev_node}
+            """.format(**locals()))
 
+        if is_platform_pg():
+            # For postgres, there should only be 1 node,
+            #  so we assign it dist_id = __dist_id__ = 0
+            plpy.execute("""
+                CREATE TABLE {dist_map} AS
+                SELECT 0::BIGINT AS dist_id, 0::BIGINT as __dist_id__
+            """.format(**locals()))
+        else:
+            # Create implicit many-to-1 map from __dist_id__ to seg_id
+            temp_dist_table = 'dist_table' + self.unique_str
+            create_dist_table_sql = """
+                CREATE TEMP TABLE {temp_dist_table} AS
+                    SELECT generate_series(1,10000) AS __dist_id__
+                {distributed_by}
+            """.format(**locals())
+            plpy.execute(create_dist_table_sql)
+
+            # Create a 1-1 map from dist_id to __dist_id__,
+            # where dist_id's are re-ordered so they are evenly
+            # distributed among segments.  This should work even
+            # if there are gaps in the dist_id assignments (some
+            # dist_id's missing)
+            leaves_sql = """
+                     SELECT
+                         node_id AS dist_id,
+                         ROW_NUMBER() OVER() AS leaf_num
+                     FROM {prev_node}
+                     GROUP BY node_id
+            """.format(**locals())
+            leaves_cnt = plpy.execute("""
+                WITH leaves as ({leaves_sql})
+                SELECT COUNT(*) cnt from leaves
+                """.format(**locals()))[0]['cnt']
+            create_dist_map_sql = """
+                CREATE TEMP TABLE {dist_map} AS
+                WITH leaves as (
+                    {leaves_sql}
+                ), segmap AS (
+                     SELECT
+                         gp_segment_id AS seg_id,
+                         ROW_NUMBER() OVER(
+                             ORDER BY ctid, gp_segment_id
+                         ) AS leaf_num,
+                         __dist_id__
+                     FROM {temp_dist_table}
+                     LIMIT {leaves_cnt}
+                )
+                SELECT dist_id, __dist_id__
+                FROM leaves JOIN segmap
+                     USING (leaf_num)
+                {distributed_by}
+            """.format(**locals())
+            plpy.execute(create_dist_map_sql)
 
-def build_kd_tree(schema_madlib, source_table, expr_point,
-                  depth, n_features, eps, **kwargs):
-    """
-        KD-tree function to create a partitioning
-        Args:
-            @param schema_madlib        Name of the Madlib Schema
-            @param source_table         Training data table
-            @param output_table         Name of the table to store kd tree
-            @param expr_point           Name of the column with training data
-                                        or expression that evaluates to a
-                                        numeric array
-            @param depth                Depth of the kd tree
-            @param n_features           Number of features
-            @param eps                  The eps value defined by the user
-    """
-    with MinWarning("error"):
-
-        clauses = [' TRUE ']
-        border_cl1 = ' FALSE '
-        border_cl2 = ' FALSE '
-        clause_counter = 0
-        kd_array = []
-        for curr_level in range(depth):
-            curr_feature = (curr_level % n_features) + 1
-            for curr_leaf in range(pow(2,curr_level)):
-                clause = clauses[clause_counter]
-                cutoff_sql = """
-                    SELECT percentile_disc(0.5)
-                           WITHIN GROUP (
-                            ORDER BY ({expr_point})[{curr_feature}]
-                           ) AS cutoff
-                    FROM {source_table}
-                    WHERE {clause}
-                    """.format(**locals())
+            plpy.execute("""
+                DROP TABLE IF EXISTS {temp_dist_table}
+            """.format(**locals()))
 
-                cutoff = plpy.execute(cutoff_sql)[0]['cutoff']
-                cutoff = "NULL" if cutoff is None else cutoff
-                kd_array.append(cutoff)
-                clause_counter += 1
-                clauses.append(clause +
-                               "AND ({expr_point})[{curr_feature}] < {cutoff} 
".
-                               format(**locals()))
-                clauses.append(clause +
-                               "AND ({expr_point})[{curr_feature}] >= {cutoff} 
".
-                               format(**locals()))
-                border_cl1 = border_cl1 + """ OR 
(({expr_point})[{curr_feature}] >= {cutoff} - {eps}
-                                            AND ({expr_point})[{curr_feature}] 
<= {cutoff} + {eps})
-                                        """.format(**locals())
-                border_cl2 = border_cl2 + """ OR 
(({expr_point})[{curr_feature}] >= {cutoff} - (2*{eps})
-                                            AND ({expr_point})[{curr_feature}] 
<= {cutoff} + (2*{eps}))
-                                        """.format(**locals())
-
-        n_leaves = pow(2, depth)
-        case_when_clause = '\n'.join(["WHEN {0} THEN 
{1}::INTEGER".format(cond, i)
-                                     for i, cond in 
enumerate(clauses[-n_leaves:])])
-        return kd_array, case_when_clause, border_cl1, border_cl2
+        gen_internal_points_sql = """
+            CREATE TEMP TABLE {internal_points} AS
+            SELECT id, point, node_id AS leaf_id, node_id AS dist_id, 
__dist_id__
+            FROM {source_table}
+            JOIN {prev_node}
+                USING (id)
+            JOIN {dist_map}
+                ON node_id = dist_id
+            {distributed_by}
+        """.format(**locals())
+        DEBUG.plpy.execute(gen_internal_points_sql)
 
+        plpy.execute("""
+            DROP TABLE IF EXISTS {prev_node}
+        """.format(**locals()))
+
+        generate_leaf_bounds_sql = """
+            CREATE TEMP TABLE {leaf_bounds_table} AS
+            SELECT
+               array_agg(min ORDER BY i) AS leaf_lower,
+               array_agg(max ORDER BY i) AS leaf_upper,
+               array_agg(min - {eps} ORDER BY i) AS dist_lower,
+               array_agg(max + {eps} ORDER BY i) AS dist_upper,
+               leaf_id,
+               __dist_id__
+            FROM (
+                SELECT
+                    leaf_id,
+                    i,
+                    min(point[i]),
+                    max(point[i]),
+                count(*)
+                FROM {internal_points}, generate_series(1, {n_dims}) AS i
+                GROUP BY leaf_id, i
+            ) x
+            JOIN {dist_map}
+                ON leaf_id = dist_id
+            GROUP BY leaf_id, __dist_id__
+            {distributed_by}
+        """.format(**locals())
+        DEBUG.plpy.execute(generate_leaf_bounds_sql)
+
+        gen_external_points_sql = """
+            CREATE TEMP TABLE {external_points} AS
+            SELECT i.id, i.point, i.leaf_id, b.leaf_id AS dist_id, 
b.__dist_id__
+            FROM {leaf_bounds_table} b JOIN {internal_points} i
+                ON b.leaf_id != i.leaf_id
+            WHERE 0 <= all({self.schema_madlib}.array_sub(point, dist_lower)) 
AND
+                  0 < all({self.schema_madlib}.array_sub(dist_upper, point))
+            {distributed_by}
+        """.format(**locals())
+        plpy.execute(gen_external_points_sql)
+
+        plpy.execute("""
+            DROP TABLE IF EXISTS
+                {leaf_bounds_table}, {dist_map}
+        """.format(**locals()))
 
 def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column,
     expr_point, output_table, **kwargs):
@@ -504,108 +937,472 @@ def dbscan_predict(schema_madlib, dbscan_table, 
source_table, id_column,
     with MinWarning("warning"):
 
         _validate_dbscan_predict(schema_madlib, dbscan_table, source_table, 
id_column,
-    expr_point, output_table)
+                                 expr_point, output_table)
 
         dbscan_summary_table = add_postfix(dbscan_table, '_summary')
         summary = plpy.execute("SELECT * FROM 
{0}".format(dbscan_summary_table))[0]
 
         eps = summary['eps']
         metric = summary['metric']
-        db_id_column = summary['id_column']
         sql = """
             CREATE TABLE {output_table} AS
-            SELECT __q1__.{id_column}, cluster_id, distance
+            WITH src AS (
+                SELECT ({id_column}) AS id, ({expr_point}) AS point FROM 
{source_table}
+            )
+            SELECT __q1__.id, cluster_id, distance
             FROM (
-                SELECT __t2__.{id_column}, cluster_id,
-                       min({schema_madlib}.{metric}(__t1__.__points__,
-                                                __t2__.{expr_point})) as 
distance
-                FROM {dbscan_table} AS __t1__, {source_table} AS __t2__
+                SELECT __t2__.id, cluster_id,
+                       min({schema_madlib}.{metric}(__t1__.point,
+                                                __t2__.point)) as distance
+                FROM {dbscan_table} AS __t1__, src AS __t2__
                 WHERE is_core_point = TRUE
-                GROUP BY __t2__.{id_column}, cluster_id
-                ) __q1__
-            WHERE distance <= {eps}
+                GROUP BY __t2__.id, cluster_id
+            ) __q1__ WHERE distance <= {eps}
             """.format(**locals())
         result = plpy.execute(sql)
 
-def find_core_points_transition(state, id_in, expr_points, eps, min_samples, 
metric, n_rows, leaf_id, **kwargs):
-
-    SD = kwargs['SD']
-    if not state:
-        data = {}
-        SD['counter{0}'.format(leaf_id)] = 0
+class label:
+    UNLABELLED = 0
+    NOISE_POINT = -1
+
+pstats = None
+class dbscan_perfstats:
+    def __init__(self):
+        self.range_query_calls = 0
+        self.range_query_time = 0.0
+        self.range_query_candidates = 0
+        self.range_query_neighbors = 0
+
+    def show(self, msg):
+        DEBUG.plpy.info("{}_stats: range_query total calls = {}"
+             .format(msg, self.range_query_calls))
+
+        range_query_calls = self.range_query_calls if self.range_query_calls 
else 0.00001
+
+        DEBUG.plpy.info("{}_stats: range_query total time = {}s"
+             .format(msg, self.range_query_time))
+        DEBUG.plpy.info("{}_stats: range_query total candidates = {}"
+            .format(msg, self.range_query_candidates))
+        DEBUG.plpy.info("{}_stats: range_query total neighbors returned = {}"
+            .format(msg, self.range_query_neighbors))
+        DEBUG.plpy.info("{}_stats: range_query avg time = {}s"
+            .format(msg, self.range_query_time / range_query_calls))
+        DEBUG.plpy.info("{}_stats: range_query avg candidates returned = {}"
+            .format(msg, self.range_query_candidates * 1.0 / 
range_query_calls))
+        DEBUG.plpy.info("{}_stats: range_query avg neighbors returned = {}"
+            .format(msg, self.range_query_neighbors * 1.0 / range_query_calls))
+       self.__init__()  # Clear counters and start over
+
+def within_range_tf(point, candidate_ids, candidate_points,
+                    metric_cutoff, norm=2):
+    sd = tf.squared_difference(tfcandidates, tfpoint)
+    distances = tf.math.reduce_sum(sd, 1)
+    output = tf.where(distances <= 2)
+    res = output.eval(session=sess)
+    res.shape = res.shape[0]
+    return res
+
+def within_range_np_linalg(point, candidate_ids, candidate_points,
+                           eps, norm=2):
+    distances = np.linalg.norm(candidate_points - point, ord=norm, axis=1)
+    return candidate_ids[distances <= eps]
+
+def within_range_np(point, candidate_ids, candidate_points,
+                    metric_cutoff, norm=2):
+    if norm==1:
+        distances = np.sum(candidate_points, axis=1)
     else:
-        data = SD['data{0}'.format(leaf_id)]
+        distances = np.sum(np.square(candidate_points - point), axis=1)
+    return candidate_ids[distances <= metric_cutoff]
 
-    data[id_in] = expr_points
-    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
-    SD['data{0}'.format(leaf_id)] = data
-    ret = [[-1,-1],[-1,-1]]
+class dbscan_record(object):
+    def __init__(self, obj, eps=None):
+            if eps is not None:
+                self._init_from_dict(obj, eps)
+            else:
+                self._clone(obj)
 
-    my_n_rows = n_rows[leaf_id]
+    @staticmethod
+    def from_dict(d, eps):
+        if d['leaf_id'] == d['dist_id']:
+            return dbscan_internal_record(d, eps)
+        else:
+            return dbscan_external_record(d, eps)
+
+    def _init_from_dict(self, d, eps):
+        """
+            dbscan_record dict constructor
+            The only reason for passing eps here is so we
+            can pre-compute the point's neighborhood for later
+        """
+        self.id = d['id']
+        self.point = d['point']
+        self.is_core_point = False
+        self.cluster_id = label.UNLABELLED
+        self.leaf_id = d['leaf_id']
+        self.dist_id = d['dist_id']
+
+        # private members (for convenience, not for output)
+        p = np.array(self.point)
+        self._np_point = p
+        self._bounds = np.concatenate([p, p])
+        self._neighborhood = np.concatenate([p - eps, p + eps])
+
+    def _clone(self, rec):
+        """
+            dbscan_record copy constructor - we call this
+            just before outputing any row.  For external
+            points, we have to clone it each time so that
+            we can output multiple rows.  For internal points,
+            it's just a quick way of cleaning it up (removing
+            the extra private vars).
+        """
+        # Copy only public members
+        self.id = rec.id
+        self.point = rec.point
+        self.is_core_point = rec.is_core_point
+        self.cluster_id = rec.cluster_id
+        self.leaf_id = rec.leaf_id
+        self.dist_id = rec.dist_id
+
+    def __repr__(self):
+        fmt = 'id={},point[{}],cluster_id={},core={},leaf_id={},dist_id={}'
+        rep = fmt.format(self.id, len(self.point), self.cluster_id,
+                         self.is_core_point, self.leaf_id, self.dist_id)
+        return 'dbscan_record({})'.format(rep)
+
+    def __eq__(self, other):
+        if not isinstance(other, dbscan_record):
+            return False
+        return self.__dict__ == other.__dict__
+
+class dbscan_internal_record(dbscan_record):
+    def __init__(self, obj, eps=None):
+        dbscan_record.__init__(self, obj, eps)
+
+    def _init_from_dict(self, d, eps):
+        """
+            private members specific to internal records
+        """
+        dbscan_record._init_from_dict(self, d, eps)
+        self._external_neighbors = []
+        self._internal_neighbor_count = 0
+        self._possible_border_points = []
+
+    def not_core_point(self, min_samples):
+        """
+            Computes self.is_core_point from internal and
+            external neighbor counts, and returns its negation
+        """
+        ext_neighbors = len(self._external_neighbors)
+        int_neighbors = self._internal_neighbor_count
+        num_neighbors = ext_neighbors + int_neighbors
+        self.is_core_point = (num_neighbors + 1 >= min_samples)
+        return not self.is_core_point
+
+class dbscan_external_record(dbscan_record):
+    def __init__(self, obj, eps=None):
+        dbscan_record.__init__(self, obj, eps)
+
+    def _init_from_dict(self, d, eps):
+        """
+            private members specific to external records
+        """
+        dbscan_record._init_from_dict(self, d, eps)
+        self._nearby_clusters = set()
+
+class LeafStorage:
+    """
+    LeafStorage
 
-    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+    This is a per-leaf storage class for dbscan_leaf().
+    There are 3 containers within LeafStorage:
 
-        core_counts = {}
-        core_lists = {}
-        p = index.Property()
-        p.dimension = len(expr_points)
-        idx = index.Index(properties=p)
-        ret = []
+    rtree - a spatial index of the id's of the leaf's internal points
+    records - a hash table for looking up dbscan_internal_records by id
+    external_records - an ordered list of dbscan_external_records
 
-        if metric == 'dist_norm1':
-            fn_dist = distance.cityblock
-        elif metric == 'dist_norm2':
-            fn_dist = distance.euclidean
+    Methods:
+
+    add_point() - adds a point to LeafStorage
+    range_query() - returns a list of internal point id's within eps of a point
+    update_neighbors() - updates internal neighbor counts based on a point and
+                         list of neighboring internal points
+    label_point() - assigns a point to a specific cluster, or marks it as noise
+
+    yield_external_records() - clones and yields a copy of each 
dbscan_external_record
+                               based on its list of nearby_clusters
+    """
+    def __init__(self, dimension, dtype, eps, metric, min_samples):
+        self.n_dims = dimension
+        self.dtype = dtype
+        self.eps = eps
+        self.metric = metric
+        self.min_samples = min_samples
+        props = index.Property(dimension=dimension)
+        self.props = props
+        self.rtree = index.Index(properties=props)
+        self.records = {}
+        self.external_records = []
+
+    def add_point(self, rec):
+        if rec.leaf_id == rec.dist_id:
+            self.rtree.add(rec.id, rec._bounds)
+            self.records[rec.id] = rec
         else:
-            fn_dist = distance.sqeuclidean
+            self.external_records.append(rec)
+
+    def range_query(self, db_rec):
+        """
+        Returns a list of records of all neighboring
+        points within eps distance from point.  Returns None if the
+        number of results will be < min_results.  Also returns None
+        if number of results > max_results > 0  The special value
+        max_results = 0 means unlimited results (return all).
+        """
+        global pstats
+        pstats.range_query_calls += 1
+        start_time = time()
+
+        eps = self.eps
+        metric = self.metric
+        records = self.records
+        rtree = self.rtree
+
+        if not metric in ('squared_dist_norm2', 'dist_norm2', 'dist_norm1'):
+            plpy.error("Sorry, optimized method does not support {} metric 
yet".format(metric))
+
+        norm = 1 if metric == 'dist_norm1' else 2
+        metric_cutoff = eps if metric == 'dist_norm1' else eps*eps
+
+        if isinstance(db_rec, dbscan_internal_record):
+            # As long as we update the neighbor counts of
+            #  of a core point's neighbors before returning,
+            #  we can safely remove it from the rtree for
+            #  future searches.  If this is a non-core point,
+            #  we still delete it, but add its id to the list of
+            #  _possible_border_points in the db_rec of each
+            #  of its unlabelled neighbors
+            self.rtree.delete(db_rec.id, db_rec._bounds)
+
+        DEBUG.start_timing('rtree_count')
+        n = rtree.count(db_rec._neighborhood)
+        DEBUG.print_timing('rtree_count')
+
+        DEBUG.start_timing('rtree_intersection')
+        ids = rtree.intersection(db_rec._neighborhood, objects=False)
+        DEBUG.print_timing('rtree_intersection')
+
+        candidate_ids = np.empty(n, dtype='int64')
+        candidate_points = np.empty([n, self.n_dims], dtype=self.dtype)
+        i = 0
+        for id in ids:
+            candidate_ids[i] = id
+            candidate_points[i] = records[id]._np_point
+            i += 1
+
+        # Avoid catastrophic issues which are extremeley difficult to debug
+        assert i == n
+
+        pstats.range_query_candidates += n
+
+        DEBUG.start_timing('within_range_np_linalg')
+        neighbors = within_range_np_linalg(
+            db_rec._np_point,
+            candidate_ids,
+            candidate_points,
+            eps,
+            norm
+        )
+        DEBUG.print_timing('within_range_np_linalg')
+
+        pstats.range_query_neighbors += len(neighbors)
+        end_time = time()
+        pstats.range_query_time += (end_time - start_time)
+
+        if isinstance(db_rec, dbscan_internal_record):
+            # Necessary because we are deleting it from the rtree
+            #  (won't show up in future searches)
+            self.update_neighbors(db_rec, neighbors)
+            neighbors = np.concatenate([neighbors, 
np.array(db_rec._possible_border_points, dtype='int64')])
+        elif isinstance(db_rec, dbscan_external_record):
+            for id in neighbors:
+                records[id]._external_neighbors.append(db_rec)
+        else:
+            raise Exception("range_query() received unexpected type 
{}".format(type(db_rec)))
 
-        for key1, value1 in data.items():
-            idx.add(key1,value1+value1,key1)
+        return neighbors
 
-        for key1, value1 in data.items():
+    def update_neighbors(self, db_rec, neighbors):
+        db_rec._internal_neighbor_count += len(neighbors)
 
-            v1 = []
-            v2 = []
-            for coor in value1:
-                v1.append(coor-eps)
-                v2.append(coor+eps)
+        not_core_point = db_rec.not_core_point(self.min_samples)
 
-            # Array concat
-            v = v1+v2
-            hits = idx.intersection(v)
+        for n_id in neighbors:
+            n_rec = self.records[n_id]
+            n_rec._internal_neighbor_count += 1
+            if not_core_point and n_rec.cluster_id == label.UNLABELLED:
+                n_rec._possible_border_points.append(db_rec.id)
 
-            if key1 not in core_counts:
-                core_counts[key1] = 0
-                core_lists[key1] = []
 
-            for key2 in hits:
-                value2 = data[key2]
-                dist = fn_dist(value1, value2)
-                if dist <= eps:
-                    core_counts[key1] += 1
-                    core_lists[key1].append(key2)
+    def label_point(self, rec, cluster):
+        """
+            Labels a point as a member of a particular cluster, or if
+            cluster = None as a NOISE_POINT.
+        """
+        rec.cluster_id = cluster
 
-        for key1, value1 in core_counts.items():
-            if value1 >= min_samples:
-                for key2 in core_lists[key1]:
-                    ret.append([key1,key2])
+        if isinstance(rec, dbscan_internal_record):
+            if cluster != label.NOISE_POINT:
+                # Mark nearby external points as near this cluster
+                for ext_neighbor in rec._external_neighbors:
+                    ext_neighbor._nearby_clusters.add(cluster)
 
-    return ret
+    def yield_external_records(self):
+        external_records = self.external_records
 
-def find_core_points_merge(state1, state2, **kwargs):
+        for rec in external_records:
+            for cluster_id in rec._nearby_clusters:
+                rec_clone = dbscan_external_record(rec)
+                self.label_point(rec_clone, cluster_id)
+                yield rec_clone
 
-    if not state1:
-        return state2
-    elif not state2:
-        return state1
+DEBUG_WITH_TRACEBACKS = False
+
+# Just for debugging... so we receive the traceback
+def dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points,
+                num_external_points, SD, **kwargs):
+    res = _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points,
+        num_external_points, SD)
+
+    if DEBUG_WITH_TRACEBACKS:
+        ret = []
+        for i, db_rec in enumerate(res):
+            ret.append(db_rec)  # Returning a generator breaks the traceback 
forwarding
+        return ret
     else:
-        plpy.error("dbscan Error: A kd-tree leaf should be on a single 
segment.")
+        return res
 
+def _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points,
+                num_external_points, SD):
+    """
+        Performs classic dbscan algorithm in parallel on each leaf
+    """
+    global pstats
 
-def find_core_points_final(state, **kwargs):
+    num_points = num_internal_points + num_external_points
 
-    return state
+    db_rec = dbscan_record.from_dict(db_rec, eps)
+    id = db_rec.id
+    dist_id = db_rec.dist_id
 
+    if dist_id in SD:
+        leaf = SD[dist_id]
+    else:
+        pstats = dbscan_perfstats()
+        DEBUG.start_timing("dbscan_leaf")
+        DEBUG.start_timing("dbscan_leaf_load_rtree")
+        leaf = LeafStorage( len(db_rec.point),
+                            db_rec._np_point.dtype,
+                            eps,
+                            metric,
+                            min_samples
+                          )
+        SD[dist_id] = leaf
+
+    records = leaf.records
+    external_records = leaf.external_records
+
+    leaf.add_point(db_rec)
+
+    if len(records) == num_internal_points and \
+       len(external_records) == num_external_points:
+        DEBUG.print_timing("dbscan_leaf_load_rtree")
+        DEBUG.start_timing("dbscan_leaf_external")
+
+        # Find all internal neighbors of external points
+        for ext_rec in external_records:
+            DEBUG.start_timing("external_rangequery")
+            # We don't need to save the result, we only care about
+            #  the side effect of adding this point to the list of
+            #  neighbor._ext_neighbors for each internal neighbor found
+            _ = leaf.range_query(ext_rec)
+            DEBUG.print_timing("external_rangequery")
+
+        DEBUG.print_timing("dbscan_leaf_external")
+        pstats.show("external")
+        pstats = dbscan_perfstats()
+
+        # Classify internal points into clusters (main dbscan algorithm),
+        #  and yield resulting internal records
+        DEBUG.start_timing("dbscan_leaf_internal")
+        cluster = 0
+        for rec in records.values():
+            if rec.cluster_id != label.UNLABELLED:
+                continue
+
+            DEBUG.start_timing('internal_rangequery')
+            neighbors = leaf.range_query(rec)
+            DEBUG.print_timing('internal_rangequery')
+
+            if not rec.is_core_point: # rec.is_core_point field will be set 
inside range_query(rec)
+                leaf.label_point(rec, label.NOISE_POINT)
+                continue
+
+            cluster += 1
+
+            leaf.label_point(rec, cluster)
+            yield dbscan_record(rec)
+
+            queue = deque([neighbors])
+            DEBUG.start_timing('internal_per_cluster')
+            while len(queue) > 0:
+                for next_id in queue.popleft():
+                    rec2 = records[next_id]
+                    if rec2.cluster_id != label.NOISE_POINT and \
+                       rec2.cluster_id != label.UNLABELLED:
+                        # We delete all points from the unlabelled
+                        # rtree as soon as we assign them to a cluster.
+                        # But we will still get here if the same point
+                        # gets returned by more than one range query
+                        # and added to the queue before the first reference
+                        # to it gets popped and processed.  The point will
+                        # be labelled as a part of this cluster the first time
+                        # it gets popped.  We can ignore any duplicate 
references
+                        # to it that get popped after that.
+                        continue
+
+                    DEBUG.start_timing('internal_rangequery')
+                    neighbors = leaf.range_query(rec2)
+                    DEBUG.print_timing('internal_rangequery')
+
+                    if not rec2.is_core_point:  # rec2.is_core_point field 
will be set inside range_query(rec2)
+                        # Label as border point
+                        leaf.label_point(rec2, cluster)
+                        yield dbscan_record(rec2)
+                        continue
+
+                    queue.append(neighbors)
+                    # Label as core point
+                    leaf.label_point(rec2, cluster)
+                    yield dbscan_record(rec2)
+
+            DEBUG.print_timing('internal_per_cluster')
+
+        DEBUG.print_timing("dbscan_leaf_internal")
+        pstats.show("internal")
+
+        # Return rows for external records (one for each
+        #  cluster an external point is close to)
+        DEBUG.start_timing("yield_external_records")
+        for ext_rec in leaf.yield_external_records():
+           yield ext_rec
+        DEBUG.print_timing("yield_external_records")
+
+        # Clean up for next time, if there is one
+        del SD[dist_id]
+
+        DEBUG.print_timing("dbscan_leaf")
 
 # The snowflake table is used to reduce the size of the edge table.
 # Note that the sole purpose of the edge table is finding the connected
@@ -709,14 +1506,21 @@ def sf_final(state, **kwargs):
     return state
 
 def _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-    expr_point, eps, min_samples, metric, algorithm, depth):
+    expr_point, eps, min_samples, metric, algorithm, max_depth):
 
     input_tbl_valid(source_table, 'dbscan')
     output_tbl_valid(output_table, 'dbscan')
     output_summary_table = add_postfix(output_table, '_summary')
     output_tbl_valid(output_summary_table, 'dbscan')
 
-    cols_in_tbl_valid(source_table, [id_column], 'dbscan')
+    _assert(is_var_valid(source_table, id_column),
+            "dbscan error: {0} is an invalid column or "
+            "expression for id param".format(id_column))
+
+    id_col_type = get_expr_type(id_column, source_table)
+    _assert(is_valid_psql_type(id_col_type, INTEGER),
+            "dbscan Error: unique id column or expression '{0}' in input table 
must"
+            " evaluate to an integer type.".format(id_column))
 
     _assert(is_var_valid(source_table, expr_point),
             "dbscan error: {0} is an invalid column name or "
@@ -724,19 +1528,22 @@ def _validate_dbscan(schema_madlib, source_table, 
output_table, id_column,
 
     point_col_type = get_expr_type(expr_point, source_table)
     _assert(is_valid_psql_type(point_col_type, NUMERIC | ONLY_ARRAY),
-            "dbscan Error: Feature column or expression '{0}' in train table 
is not"
+            "dbscan Error: Feature column or expression '{0}' in input table 
is not"
             " a numeric array.".format(expr_point))
 
     _assert(eps > 0, "dbscan Error: eps has to be a positive number")
 
     _assert(min_samples > 0, "dbscan Error: min_samples has to be a positive 
number")
 
-    fn_dist_list = ['dist_norm1', 'dist_norm2', 'squared_dist_norm2', 
'dist_angle', 'dist_tanimoto']
-    _assert(metric in fn_dist_list, "dbscan Error: metric has to be one of the 
madlib defined distance functions")
+    fn_dist_list = ['dist_norm1', 'dist_norm2', 'squared_dist_norm2',
+                    'dist_angle', 'dist_tanimoto']
+    _assert(metric in fn_dist_list,
+        "dbscan Error: metric has to be one of the madlib defined distance 
functions")
 
-    _assert(algorithm == BRUTE_FORCE or RTREE_ENABLED == 1,
-        "dbscan Error: Cannot use kd_tree without the necessary python module: 
rtree")
-    _assert(depth > 0, "dbscan Error: depth has to be a positive number")
+    _assert(algorithm == METHOD_BRUTE_FORCE or RTREE_ENABLED == 1,
+        "dbscan Error: Cannot use rtree without the necessary python module: 
rtree")
+    _assert(max_depth is None or max_depth >= 0,
+        "dbscan Error: max_segmentation_depth must be a non-negative integer 
or NULL")
 
 def _validate_dbscan_predict(schema_madlib, dbscan_table, source_table,
     id_column, expr_point, output_table):
@@ -747,7 +1554,14 @@ def _validate_dbscan_predict(schema_madlib, dbscan_table, 
source_table,
     input_tbl_valid(dbscan_summary_table, 'dbscan')
     output_tbl_valid(output_table, 'dbscan')
 
-    cols_in_tbl_valid(source_table, [id_column], 'dbscan')
+    _assert(is_var_valid(source_table, id_column),
+            "dbscan error: {0} is an invalid column name or "
+            "expression for id param".format(id_column))
+
+    id_col_type = get_expr_type(id_column, source_table)
+    _assert(is_valid_psql_type(id_col_type, INTEGER),
+            "dbscan Error: unique id column or expression '{0}' in input table 
must"
+            " evaluate to an integer type.".format(id_column))
 
     _assert(is_var_valid(source_table, expr_point),
             "dbscan error: {0} is an invalid column name or "
@@ -779,14 +1593,24 @@ def dbscan_help(schema_madlib, message=None, **kwargs):
 SELECT {schema_madlib}.dbscan(
     source_table,       -- Name of the training data table
     output_table,       -- Name of the output table
-    id_column,          -- Name of id column in source_table
+    id_column,          -- A column or expression of columns specifying a 
unique id for rows in the source_table
     expr_point,         -- Column name or expression for data points
     eps,                -- The minimum radius of a cluster
     min_samples,        -- The minimum size of a cluster
     metric,             -- The name of the function to use to calculate the
                         -- distance
-    algorithm,          -- The algorithm to use for dbscan: brute or kd_tree
-    depth               -- The depth for the kdtree algorithm
+    algorithm,          -- The algorithm to use for dbscan: brute_force or 
optimized (default=optimized)
+    max_segmentation_depth  -- This has no effect on method=brute.  For 
method=rtree, this
+                               limits the number of splits to be considered 
while dividing
+                               the dataset into smaller spacial regions for 
optimal parallel performance.
+                               The default ( NULL or ommitted ) value means it 
will be unlimited,
+                               but note that it will never break the data up 
into more regions than
+                               segments in a greenplum cluster, and in some 
cases may choose fewer.
+                               Setting this to non-NULL will decrease up-front 
overhead, and can
+                               be useful if you expect to find only a small 
number of densely populated
+                               clusters, or if the dataset itself is small 
enough where initial planning
+                               overhead matters more than scaling performance. 
 This parameter will
+                               not affect the output, only the performance.
     );
 
 -----------------------------------------------------------------------
@@ -794,10 +1618,10 @@ SELECT {schema_madlib}.dbscan(
 -----------------------------------------------------------------------
 The output of the dbscan function is a table with the following columns:
 
-id_column           The ids of test data point
-cluster_id          The id of the points associated cluster
+id                  The id of each point which has been assigned to a cluster
+cluster_id          The cluster_id's assigned to each point
 is_core_point       Boolean column that indicates if the point is core or not
-points              The column or expression for the data point
+point               The coordinates of each classified point
 """
     else:
         help_string = """
diff --git a/src/ports/postgres/modules/dbscan/dbscan.sql_in 
b/src/ports/postgres/modules/dbscan/dbscan.sql_in
index 7e1df05..41a96a2 100644
--- a/src/ports/postgres/modules/dbscan/dbscan.sql_in
+++ b/src/ports/postgres/modules/dbscan/dbscan.sql_in
@@ -84,11 +84,11 @@ dbscan( source_table,
 <dd>TEXT. Name of the table containing the clustering results.
 </dd>
 
-<dt>id_column</dt>
-<dd>TEXT. Name of the column containing a unique integer id for each training 
point.
+<dt>id</dt>
+<dd>TEXT. Name of a column or expression containing a unique integer id for 
each training point.
 </dd>
 
-<dt>expr_point</dt>
+<dt>point</dt>
 <dd>TEXT. Name of the column with point coordinates in array form,
 or an expression that evaluates to an array of point coordinates.
 </dd>
@@ -131,13 +131,36 @@ performance reasons.</li>
 </dd>
 
 <dt>algorithm (optional)</dt>
-<dd>TEXT, default: 'brute_force'. The name of the algorithm
-used to compute clusters. Currently only brute force is supported:
+<dd>TEXT, default: 'optimized'. The name of the algorithm
+used to compute clusters.
 <ul>
-<li><b>\ref brute_force</b>: Produces an exact result by searching
-all points in the search space.  Brute force can be slow and is intended
-to be used for small datasets only.  You can also use a short
-form "b" or "brute" etc. to select brute force.</li>
+<li><b>\ref brute_force</b>: Brute force can be slow and should
+not be used for large datasets.  However, it does have less initial
+setup overhead than the parllel-optimized algorithm, which may make
+it faster for some cases involving small datasets.  You can also use
+a short form "b" or "brute" etc. to select brute force.</li>
+
+<li><b>\ref optimized</b>: This uses an rtree index to accelerate range_queries
+while performing the cluster detection.  It is designed with gpdb in mind,
+in which case it intelligently partitions the space into a number of connected
+regions, runs DBSCAN in parallel on each region, and then merges the results
+together.  By default, the maximum number of regions it will consider using
+is equal to the number of segments in the database, but if you suspect it
+may be spending too much time on the segmentation, this can be limited further
+by setting the max_segmentation_depth parameter to something lower.  This 
should
+decrease the segmentation overhead, but will also decrease the amount of 
parallelism.
+If the dataset is relatively small, another option to consider is brute force,
+which has very little overhead but won't scale as well.  For large enough 
datasets,
+and an appropriate choice of eps, this algorithm should be able to run in 
O((N/S) log N)
+time where N is the number of rows (points) in the input and S is the number 
of regions
+(often equal to the number of segments, but may be less).  (Worst case for 
brute force
+is O(N^2).) eps should be chosen based on the density of the dataset, where 
DBSCAN works
+best for datasets where the clusters have a roughly uniform density.  If eps 
is chosen
+too large, then the runtime will explode and nearly every point will be 
considered
+connected to every other point in one large cluster.  Best practice is to 
start with
+a relatively small eps, so it can return the first result faster; if it looks
+like there are too many small clusters, increase eps and allow it to run for 
longer.
+
 </ul></dd>
 </dl>
 
@@ -146,23 +169,25 @@ form "b" or "brute" etc. to select brute force.</li>
 The output table for DBSCAN module has the following columns:
 <table class="output">
     <tr>
-        <th>id_column</th>
-        <td>INTEGER. Test data point id.</td>
+        <th>id</th>
+        <td>SMALLINT|INTEGER|BIGINT. A column or expression which specifies
+            unique ids for each row in the training dataset.  Must evaluate
+            to an integer type.</td>
     </tr>
     <tr>
         <th>cluster_id</th>
-        <td>INTEGER. Cluster id associated with the test data point.</td>
+        <td>INTEGER. The cluster id of each classified data point.</td>
     </tr>
     <tr>
         <th>is_core_point</th>
-        <td>BOOLEAN. Indicates if the test data point is core
+        <td>BOOLEAN. Indicates if the training data point is core
         If it is not a core point and listed in the output table,
         it is a border point.  Noise points are not shown in this
         table.</td>
     </tr>
     <tr>
-        <th>__points__</th>
-        <td>TEXT. Column or expression for the data point.</td>
+        <th>point</th>
+        <td>TEXT. The coordinates of each point classified</td>
     </tr>
 </table>
 
@@ -174,8 +199,8 @@ After clustering, the cluster assignment for each data 
point can be computed:
 <pre class="syntax">
 dbscan_predict( dbscan_table,
                 source_table,
-                id_column,
-                expr_point,
+                id,
+                point,
                 output_table
                 )
 </pre>
@@ -190,11 +215,12 @@ dbscan_predict( dbscan_table,
 <dd>TEXT. Name of the table containing the input data points.
 </dd>
 
-<dt>id_column</dt>
-<dd>TEXT. Name of the column containing a unique integer id for each training 
point.
+<dt>id</dt>
+<dd>VARCHAR.  A column name or expression which selects a unique integer
+              id for each training point.
 </dd>
 
-<dt>expr_point</dt>
+<dt>point</dt>
 <dd>TEXT. Name of the column with point coordinates in array form,
 or an expression that evaluates to an array of point coordinates.
 </dd>
@@ -209,8 +235,8 @@ or an expression that evaluates to an array of point 
coordinates.
 The output is a table with the following columns:
 <table class="output">
     <tr>
-      <th>id_column</th>
-      <td>INTEGER. ID column passed to the function.</td>
+      <th>id</th>
+      <td>BIGINT. The unique id of each row, as it as passed in.</td>
     </tr>
     <tr>
       <th>cluster_id</th>
@@ -222,7 +248,6 @@ The output is a table with the following columns:
     </tr>
 </table>
 
-
 @anchor examples
 @par Examples
 
@@ -322,7 +347,7 @@ The summary table lists the 'eps' value and the distance 
metric used:
 SELECT * FROM dbscan_result_summary;
 </pre>
 <pre class="result">
- id_column | eps  |   metric
+ id | eps  |   metric
 -----------+------+------------
  pid       | 1.75 | dist_norm2
 </pre>
@@ -378,7 +403,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan(
     min_samples                 INTEGER,
     metric                      VARCHAR,
     algorithm                   VARCHAR,
-    depth                       INTEGER
+    max_segmentation_depth      INTEGER
 ) RETURNS VOID AS $$
     PythonFunction(dbscan, dbscan, dbscan)
 $$ LANGUAGE plpythonu VOLATILE
@@ -415,7 +440,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan(
     source_table                VARCHAR,
     output_table                VARCHAR,
     id_column                   VARCHAR,
-    expr_point                  VARCHAR,
+    point                       VARCHAR,
     eps                         DOUBLE PRECISION,
     min_samples                 INTEGER
 ) RETURNS VOID AS $$
@@ -473,30 +498,30 @@ $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
-    state1          INTEGER[][],
-    state2          INTEGER[][]
-) RETURNS INTEGER[][] AS $$
+    state1          BIGINT[][],
+    state2          BIGINT[][]
+) RETURNS BIGINT[][] AS $$
 PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
     return dbscan.sf_merge(**globals())
 $$ LANGUAGE plpythonu
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
-    state INTEGER[][]
-) RETURNS INTEGER[][] AS $$
+    state BIGINT[][]
+) RETURNS BIGINT[][] AS $$
 PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
     return dbscan.sf_final(**globals())
 $$ LANGUAGE plpythonu
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
-    state                       INTEGER[],
+    state                       BIGINT[],
     src                         BIGINT,
     dest                        BIGINT,
     n_rows                      INTEGER[],
     gp_segment_id               INTEGER
 
-) RETURNS INTEGER[][] AS $$
+) RETURNS BIGINT[][] AS $$
 PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
     return dbscan.sf_transition(**globals())
 $$ LANGUAGE plpythonu
@@ -513,77 +538,245 @@ CREATE AGGREGATE MADLIB_SCHEMA.build_snowflake_table(
     INTEGER[],
     INTEGER
 )(
-    STYPE=INTEGER[][],
+    STYPE=BIGINT[][],
     SFUNC=MADLIB_SCHEMA.sf_transition,
     m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
     FINALFUNC=MADLIB_SCHEMA.sf_final
 );
 
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.find_core_points_transition(
-    state                       INTEGER[],
-    id_in                         BIGINT,
-    expr_points DOUBLE PRECISION[],
-    eps     DOUBLE PRECISION,
-    min_samples INTEGER,
-    metric  VARCHAR,
-    n_rows  INTEGER[],
-    leaf_id     INTEGER
-
-) RETURNS INTEGER[][] AS $$
-PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
-    return dbscan.find_core_points_transition(**globals())
-$$ LANGUAGE plpythonu
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
-
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.find_core_points_merge(
-    state1          INTEGER[][],
-    state2          INTEGER[][]
-) RETURNS INTEGER[][] AS $$
-PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
-    return dbscan.find_core_points_merge(**globals())
-$$ LANGUAGE plpythonu
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
-
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.find_core_points_final(
-    state INTEGER[][]
-) RETURNS INTEGER[] AS $$
-PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
-    return dbscan.find_core_points_final(**globals())
-$$ LANGUAGE plpythonu
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
-
---id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
-DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.find_core_points(
-    BIGINT,
-    DOUBLE PRECISION[],
-    DOUBLE PRECISION,
-    INTEGER,
-    VARCHAR,
-    INTEGER[],
-    INTEGER);
-CREATE AGGREGATE MADLIB_SCHEMA.find_core_points(
-    BIGINT,
-    DOUBLE PRECISION[],
-    DOUBLE PRECISION,
-    INTEGER,
-    VARCHAR,
-    INTEGER[],
-    INTEGER
-)(
-    STYPE=INTEGER[][],
-    SFUNC=MADLIB_SCHEMA.find_core_points_transition,
-    m4_ifdef(`__POSTGRESQL__', `', 
`prefunc=MADLIB_SCHEMA.find_core_points_merge,')
-    FINALFUNC=MADLIB_SCHEMA.find_core_points_final
+DROP TYPE IF EXISTS MADLIB_SCHEMA.__dbscan_record CASCADE;
+CREATE TYPE MADLIB_SCHEMA.__dbscan_record AS (
+    id              BIGINT,
+    point           DOUBLE PRECISION[],
+    is_core_point   BOOL,
+    cluster_id      BIGINT,
+    leaf_id         INT,
+    dist_id         INT
 );
 
-DROP TYPE IF EXISTS MADLIB_SCHEMA.unpacked_2d CASCADE;
-CREATE TYPE MADLIB_SCHEMA.unpacked_2d AS (
+DROP TYPE IF EXISTS MADLIB_SCHEMA.__dbscan_edge CASCADE;
+CREATE TYPE MADLIB_SCHEMA.__dbscan_edge AS (
     src BIGINT,
     dest BIGINT);
 
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.unpack_2d (packed INTEGER[][])
-  RETURNS SETOF MADLIB_SCHEMA.unpacked_2d
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_unpack_edge (packed 
BIGINT[][])
+  RETURNS SETOF MADLIB_SCHEMA.__dbscan_edge
 AS $$
     return packed
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_safe_ln(x DOUBLE PRECISION)
+RETURNS DOUBLE PRECISION AS
+$$
+    SELECT CASE WHEN $1 > 0.0
+        THEN ln($1)
+        ELSE -1000.0
+    END
+$$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_safe_exp(x DOUBLE PRECISION)
+RETURNS DOUBLE PRECISION AS
+$$
+    SELECT CASE WHEN $1 > -600.0
+        THEN exp($1)
+        ELSE 0.0
+    END
+$$ LANGUAGE SQL IMMUTABLE;
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.prod(DOUBLE PRECISION);
+CREATE AGGREGATE MADLIB_SCHEMA.prod(DOUBLE PRECISION)
+(
+   sfunc = float8mul,
+   stype = DOUBLE PRECISION
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_split_volume(
+    V DOUBLE    PRECISION,       -- $1
+    eps_bin     DOUBLE PRECISION, -- $2
+    eps_bins    DOUBLE PRECISION  -- $3
+) RETURNS DOUBLE PRECISION
+AS
+$$ SELECT
+    $2 / $3 * $1
+$$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_estimate_neighbors(
+    internal_points     DOUBLE PRECISION, -- $1
+    V                   DOUBLE PRECISION, -- $2
+    eps                 DOUBLE PRECISION, -- $3
+    D                   BIGINT           -- $4
+) RETURNS DOUBLE PRECISION
+AS
+$$ SELECT
+    CASE WHEN $2 > 0.0
+    THEN
+        $1 * ($3^$4 / $2)
+    ELSE
+        1.0
+    END
+$$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_range_query_penalty(
+    internal_points BIGINT,    -- $1
+    neighbors DOUBLE PRECISION -- $2
+) RETURNS DOUBLE PRECISION
+AS
+$$ SELECT
+    greatest(
+        $2,
+        CASE WHEN $1 > 2
+            -- 60x faster than log(internal_points, 2.0)
+            -- due to DOUBLE PRECISION types instead of NUMERIC
+            THEN ceil(ln($1::DOUBLE PRECISION) / ln(2.0::DOUBLE PRECISION))
+            ELSE 1.0
+        END
+    )
+$$ LANGUAGE SQL IMMUTABLE;
+
+DROP TYPE IF EXISTS MADLIB_SCHEMA.__dbscan_losses;
+CREATE TYPE MADLIB_SCHEMA.__dbscan_losses AS (
+    left_avail      BIGINT,
+    right_avail     BIGINT,
+    left_loss       DOUBLE PRECISION,
+    right_loss      DOUBLE PRECISION
+);
+
+-- Decide how much of node's available segments to allocate
+--  to its left child.  The rest will be allocated to its
+--  right child
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_allocate_segs(
+    avail_segs BIGINT,            -- $1
+    left_points DOUBLE PRECISION,  -- $2
+    right_points DOUBLE PRECISION, -- $3
+    rqp_left DOUBLE PRECISION,    -- $4
+    rqp_right DOUBLE PRECISION    -- $5
+) RETURNS BIGINT AS
+$$
+    WITH p(left_penalty, right_penalty) AS (
+        SELECT
+            $4 * $2,
+            $5 * $3
+    )
+    SELECT
+        round(
+            left_penalty * $1 / (left_penalty + right_penalty)
+        )::BIGINT
+    FROM p
+$$ LANGUAGE SQL IMMUTABLE;
+
+-- Given internal and external point counts in left and right regions,
+-- returns optimal number of segments to allocate to left and right
+-- nodes, as well as their losses.
+--
+-- TODO: we can probably remove range-query neighbor penalty for
+--       external points if we use rtree.count() instead of
+--       rtree.intersection().  Just need to finish adding the
+--       per-cluster rtrees
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_segmentation_loss(
+    left_internal BIGINT,      -- $1
+    right_internal BIGINT,     -- $2
+    left_external BIGINT,      -- $3
+    right_external BIGINT,     -- $4
+    avail_segs BIGINT,         -- $5
+    V DOUBLE PRECISION,                -- $6
+    eps DOUBLE PRECISION,      -- $7
+    D BIGINT,                  -- $8
+    eps_bin DOUBLE PRECISION,  -- $9
+    eps_bins DOUBLE PRECISION  -- $10
+ ) RETURNS MADLIB_SCHEMA.__dbscan_losses AS
+$$
+    WITH __local AS (
+        SELECT
+            ($1 + $3)  left_points,
+            ($2 + $4)  right_points,
+            $9          left_eps_bins,
+            $10 - $9    right_eps_bins,
+            $10         all_eps_bins
+    ), penalties AS (
+        SELECT
+            MADLIB_SCHEMA.__dbscan_range_query_penalty(
+                $1 + $2,
+                MADLIB_SCHEMA.__dbscan_estimate_neighbors(
+                    $1 + $2,
+                    $6, $7, $8
+                )
+            )                                             rqp_all,
+            MADLIB_SCHEMA.__dbscan_range_query_penalty(
+                $1,
+                MADLIB_SCHEMA.__dbscan_estimate_neighbors(
+                    $1,
+                    MADLIB_SCHEMA.__dbscan_split_volume(
+                        $6,
+                        left_eps_bins,
+                        all_eps_bins
+                    ), 
+                    $7,
+                    $8
+                )
+            )                                             rqp_left,
+            MADLIB_SCHEMA.__dbscan_range_query_penalty(
+                $2,
+                MADLIB_SCHEMA.__dbscan_estimate_neighbors(
+                    $2,
+                        MADLIB_SCHEMA.__dbscan_split_volume(
+                        $6,
+                        left_eps_bins,
+                        all_eps_bins
+                    ), $7, $8
+                )
+            )                                             rqp_right
+        FROM __local
+    ), seg_alloc(left_avail, all_penalty) AS (
+        SELECT
+            MADLIB_SCHEMA.__dbscan_allocate_segs(
+                $5,
+                left_points,
+                right_points,
+                rqp_left,
+                rqp_right
+            ),
+            (left_points + right_points) * rqp_all
+        FROM __local, penalties
+    )
+    SELECT
+        CASE WHEN
+            0 < left_avail AND left_avail < $5
+        THEN
+            ROW(
+                left_avail,
+                $5 - left_avail,
+                (left_points * 1.0 / left_avail) * rqp_left,
+                (right_points * 1.0 / ($5 - left_avail)) * rqp_right
+               )::MADLIB_SCHEMA.__dbscan_losses
+        ELSE -- forget about splitting, not worth it
+            ROW(
+                $5,
+                0,
+                -- We intentionally do not divide by avail_segs,
+                --  because if we choose not to split, then there can be no
+                --  more division later on
+                (left_points + right_points) * rqp_all,
+                0.0
+               )::MADLIB_SCHEMA.__dbscan_losses
+        END
+    FROM __local, penalties, seg_alloc
+$$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.__dbscan_leaf(
+    db_rec              MADLIB_SCHEMA.__dbscan_record,
+    eps                 REAL,
+    min_samples         INT,
+    metric              TEXT,
+    num_internal_points BIGINT,
+    num_external_points BIGINT
+) RETURNS SETOF MADLIB_SCHEMA.__dbscan_record AS
+$$
+    global SD
+    args.append(SD)
+    import plpy
+
+    PythonFunctionBodyOnlyNoSchema(dbscan,dbscan)
+    WithTracebackForwarding(return dbscan.dbscan_leaf(*args))
+$$ LANGUAGE plpythonu VOLATILE
+m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
diff --git a/src/ports/postgres/modules/dbscan/test/dbscan.sql_in 
b/src/ports/postgres/modules/dbscan/test/dbscan.sql_in
index 0e79e98..5756b99 100644
--- a/src/ports/postgres/modules/dbscan/test/dbscan.sql_in
+++ b/src/ports/postgres/modules/dbscan/test/dbscan.sql_in
@@ -43,18 +43,20 @@ copy dbscan_train_data (id_in, data) FROM stdin delimiter 
'|';
 DROP TABLE IF EXISTS out1, out1_summary, out1_predict;
 SELECT 
dbscan('dbscan_train_data','out1','id_in','data',20,4,'squared_dist_norm2','brute');
 
-SELECT assert(count(DISTINCT id_in) = 5, 'Incorrect cluster 0') FROM out1 
WHERE cluster_id = 0 and id_in=ANY(ARRAY[1,2,3,4,5]);
+SELECT assert(count(DISTINCT id) = 5, 'Incorrect cluster 0') FROM out1 WHERE 
cluster_id = 0 and id=ANY(ARRAY[1,2,3,4,5]);
 
-SELECT assert(count(DISTINCT id_in) = 4, 'Incorrect cluster 1') FROM out1 
WHERE cluster_id = 1 and id_in=ANY(ARRAY[6,7,8,9]);
+SELECT assert(count(DISTINCT id) = 4, 'Incorrect cluster 1') FROM out1 WHERE 
cluster_id = 1 and id=ANY(ARRAY[6,7,8,9]);
+
+SELECT assert(id_column = 'id_in', 'id_column field in summary table should 
have been ''id_in''') FROM out1_summary;
 
 SELECT dbscan_predict('out1', 'dbscan_train_data', 'id_in', 'data', 
'out1_predict');
 
 SELECT assert(count(DISTINCT cluster_id) = 2, 'Incorrect cluster count') FROM 
out1_predict;
 
-DROP TABLE IF EXISTS out1, out1_summary, out1_predict;
+DROP TABLE IF EXISTS out1, out1_summary;
 SELECT 
dbscan('dbscan_train_data','out1','id_in','data',20,4,'dist_norm2','brute');
 
-DROP TABLE IF EXISTS out1, out1_summary, out1_predict;
+DROP TABLE IF EXISTS out1, out1_summary;
 SELECT 
dbscan('dbscan_train_data','out1','id_in','data',20,4,'dist_norm1','brute');
 
 DROP TABLE IF EXISTS dbscan_train_data2;
@@ -97,16 +99,16 @@ INSERT INTO dbscan_test_data2 VALUES
 
 DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary, dbscan_predict_out;
 SELECT dbscan(
-'dbscan_train_data2',    -- source table
+'dbscan_train_data2',   -- source table
 'dbscan_result',        -- output table
 'pid',                  -- point id column
-'points',             -- data point
+'points',               -- data point
  1.75,                  -- epsilon
  4,                     -- min samples
 'dist_norm2',           -- metric
-'brute_force');               -- algorithm
+'brute_force');         -- algorithm
 
-SELECT * FROM dbscan_result ORDER BY pid;
+SELECT * FROM dbscan_result ORDER BY id;
 
 SELECT assert(count(DISTINCT cluster_id) = 3, 'Incorrect cluster count') FROM 
dbscan_result;
 
@@ -114,42 +116,82 @@ SELECT dbscan_predict('dbscan_result', 
'dbscan_test_data2', 'pid', 'points', 'db
 
 SELECT assert(count(DISTINCT cluster_id) = 3, 'Incorrect cluster count') FROM 
dbscan_predict_out;
 
-DROP TABLE IF EXISTS dbscan_train_data3;
-CREATE TABLE dbscan_train_data3 (
-id_in integer,
-data0 integer,
-data1 integer);
-copy dbscan_train_data3 (id_in, data0, data1) FROM stdin delimiter '|';
-1|1|1
-2|2|2
-3|3|3
-4|4|4
-5|4|8
-6|17|8
-7|19|8
-8|19|9
-9|19|10
-10|3|111
-11|3|112
-12|3|113
-13|8|113
+DROP TABLE IF EXISTS dbscan_cities;
+CREATE TABLE dbscan_cities (
+       city TEXT,
+       country TEXT,
+       lat DOUBLE PRECISION,
+       lon DOUBLE PRECISION,
+       PRIMARY KEY (city, country)
+);
+COPY dbscan_cities (city, country, lat, lon) FROM stdin csv header;
+city,country,lat,lon
+Bangkok,Thailand,13.7563,100.5018
+Athens,Greece,37.9838,23.7275
+Beijing,China,39.9042,116.4074
+St. Petersburg,Russia,59.9311,30.3609
+Barcelona,Spain,41.3874,2.1686
+Tehran,Iran,35.6892,51.3890
+Bristol,England,51.4545,-2.5879
+São Paulo,Brazil,-23.5558,-46.6396
+Berlin,Germany,52.5200,13.4050
+New Dehli,India,28.6139,77.2090
+Sydney,Australia,-34,151
+Bogotá,Colombia,4,-74
+Kingston,Jamaica,17,-76
+Rome,Italy,41,12
+Bombay,India,19,72
+Cape Town,South Africa,-33,22
+Cairo,Egypt,30,31
+Guayaquil,Ecuador,-2,-79
+Jakarta,Indonesia,-6,106
+Hamburg,Germany,53.5511,9.9937
+Hong Kong,China,22,114
+Bordeux,France,44,0
+Nairobi,Kenya,-1.2921,36.8219
+Osaka,Japan,34,135
 \.
 
-DROP TABLE IF EXISTS out1, out1_summary;
-SELECT 
dbscan('dbscan_train_data3','out1','id_in','ARRAY[data0,data1]',20,4,'squared_dist_norm2','brute');
+\set unit_3d_point_expr ARRAY[COS(radians(lat))*COS(radians(lon)), 
COS(radians(lat))*SIN(radians(lon)), SIN(radians(lat))]
+\set id_expr '(''x'' || substr(md5(city || '','' || 
country),1,16))::BIT(64)::BIGINT'
 
-DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary, dbscan_predict_out;
+-- Test that id and point params may be passed in as expressions
+DROP TABLE IF EXISTS dbscan_cities_out, dbscan_cities_out_summary;
+SELECT dbscan('dbscan_cities',
+       'dbscan_cities_out',
+       :'id_expr',
+       :'unit_3d_point_expr',
+       radians(22), -- eps = 35 degrees
+       3,          -- min_samples = 3
+       'dist_angle',
+       'brute'
+);
+
+SELECT assert(
+       id_column = $$('x' || substr(md5(city || ',' || 
country),1,16))::BIT(64)::BIGINT$$,
+       'id expression passed in does not match expression saved in summary 
table: ' || id_column
+) FROM dbscan_cities_out_summary;
+SELECT assert(
+       expr_point = :'unit_3d_point_expr',
+       'point expression passed in does not match expression saved in summary 
table: ' || expr_point
+) FROM dbscan_cities_out_summary; -- verify point expression saved in summary
+SELECT assert(COUNT(*) = 18, 'Wrong number of rows returned.  Expeted: 7, 
Actual: ' || COUNT(*)) FROM dbscan_cities_out;
+SELECT assert(COUNT(DISTINCT cluster_id) = 3, 'Wrong number of clusters found. 
 Expected: ' || 3 || ', Actual: ' || COUNT(DISTINCT cluster_id)) FROM 
dbscan_cities_out;
+SELECT assert(COUNT(*) = 15, 'Wrong number of core points found.  Expected: ' 
|| 15 || ', Actual: ' || COUNT(*)) FROM dbscan_cities_out WHERE is_core_point;
+SELECT assert(ARRAY(SELECT city FROM dbscan_cities JOIN (SELECT id FROM 
dbscan_cities_out WHERE cluster_id IN (SELECT cluster_id FROM dbscan_cities_out 
b GROUP BY cluster_id HAVING (COUNT(*)) < 5)) x ON id = :id_expr ORDER BY id) = 
ARRAY['Bogotá','Kingston','Guayaquil'], 'Wrong cities in smallest cluster, 
expected: Bogotá, Kingston, Guayaquil');
+
+DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary;
 SELECT dbscan(
-'dbscan_train_data2',    -- source table
+'dbscan_train_data2',   -- source table
 'dbscan_result',        -- output table
 'pid',                  -- point id column
-'points',             -- data point
+'points',               -- data point
  1.75,                  -- epsilon
  4,                     -- min samples
 'dist_norm2',           -- metric
-'brute_force');               -- algorithm
+'brute_force');         -- algorithm
 
--- Test kd-tree
+-- Test optimized dbscan
 
 DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary;
 SELECT dbscan(
@@ -160,17 +202,16 @@ SELECT dbscan(
  1.75,                  -- epsilon
  4,                     -- min samples
 'dist_norm2',           -- metric
-'kd_tree',              -- algorithm
+'optimized',            -- algorithm
 2);                     -- depth
 
-SELECT * FROM dbscan_result ORDER BY pid;
+SELECT * FROM dbscan_result ORDER BY id;
 
-
-CREATE TABLE dbscan_kd_test_data (
+CREATE TABLE dbscan_opt_test_data (
     row_id integer,
     row_vec double precision[]
 );
-COPY dbscan_kd_test_data (row_id, row_vec) FROM stdin DELIMITER '|';
+COPY dbscan_opt_test_data (row_id, row_vec) FROM stdin DELIMITER '|';
 1 | {-123.932588658681,312.674262022751,24.5140517556683}
 2 | {-1011.71837586596,294.857038270306,-596.339561040807}
 3 | {51.8496080463916,-392.173152861615,188.5097494329}
@@ -1174,23 +1215,24 @@ COPY dbscan_kd_test_data (row_id, row_vec) FROM stdin 
DELIMITER '|';
 \.
 
 DROP TABLE IF EXISTS dbscan_result, dbscan_result_summary;
-SELECT dbscan('dbscan_kd_test_data','dbscan_result','row_id','row_vec',50, 3, 
'dist_norm2', 'br');
+SELECT dbscan('dbscan_opt_test_data','dbscan_result','row_id','row_vec',50, 3, 
'dist_norm2', 'br');
 
-DROP TABLE IF EXISTS dbscan_result_kd, dbscan_result_kd_summary;
-SELECT dbscan('dbscan_kd_test_data','dbscan_result_kd','row_id','row_vec',50, 
3, 'dist_norm2', 'kd', 2);
+DROP TABLE IF EXISTS dbscan_result_opt, dbscan_result_opt_summary;
+SELECT 
dbscan('dbscan_opt_test_data','dbscan_result_opt','row_id','row_vec',50, 3, 
'dist_norm2', 'optimized', 2);
 
-SELECT assert(count(row_id) = 50 AND count(distinct cluster_id) = 11, 
'Incorrect cluster counts for brute')
+SELECT assert(count(id) = 50 AND count(distinct cluster_id) = 11, 'Incorrect 
cluster counts for brute')
 FROM dbscan_result;
 
-SELECT assert(count(row_id) = 50 AND count(distinct cluster_id) = 11, 
'Incorrect cluster counts for kd')
-FROM dbscan_result_kd;
+SELECT assert(count(id) = 50 AND count(distinct cluster_id) = 11, 'Incorrect 
cluster counts for optimized dbscan')
+FROM dbscan_result_opt;
 
 CREATE TABLE temp_result AS
-SELECT row_id
+SELECT id
 FROM dbscan_result
-WHERE cluster_id = (SELECT cluster_id FROM dbscan_result WHERE row_id = 4);
+WHERE cluster_id = (SELECT cluster_id FROM dbscan_result WHERE id = 4);
 
 SELECT assert(count(*) = 6, 'Clusters do not match')
-FROM dbscan_result_kd
-WHERE cluster_id = (SELECT cluster_id FROM dbscan_result_kd WHERE row_id = 4)
-      AND row_id IN (SELECT row_id FROM temp_result);
+FROM dbscan_result_opt
+WHERE cluster_id = (SELECT cluster_id FROM dbscan_result_opt WHERE id = 4)
+      AND id IN (SELECT id FROM temp_result);
+
diff --git 
a/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in 
b/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in
index 1aa0681..3c71e73 100644
--- a/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in
+++ b/src/ports/postgres/modules/dbscan/test/unit_tests/test_dbscan.py_in
@@ -19,18 +19,18 @@
 
 import sys
 import numpy as np
+from collections import Counter
 from os import path
-
-# Add modules to the pythonpath.
-sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))))
-sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))
-
 import unittest
 from mock import *
 import plpy_mock as plpy
 
 m4_changequote(`<!', `!>')
 
+# Add modules to the pythonpath.
+sys.path.append(path.join(path.dirname(path.abspath(__file__)), '../../..'))
+sys.path.append(path.join(path.dirname(path.abspath(__file__)), '../..'))
+
 class DBSCANTestCase(unittest.TestCase):
 
     def setUp(self):
@@ -52,27 +52,343 @@ class DBSCANTestCase(unittest.TestCase):
             'output_table' : "output",
             'id_column' : "id_in",
             'expr_point' : "data",
-            'eps' : 20,
-            'min_samples' : 4,
-            'metric' : 'squared_dist_norm2',
+            'eps' : 3.0,
+            'min_samples' : 1,
+            'metric' : 'dist_norm2',
             'algorithm' : 'brute',
-            'depth': 2
+            'max_segmentation_depth': 5
         }
         import dbscan.dbscan
         self.module = dbscan.dbscan
+        self.module.pstats = self.module.dbscan_perfstats()
+
+        dbscan_record = self.module.dbscan_record
+        dbscan_internal_record = self.module.dbscan_internal_record
+        dbscan_external_record = self.module.dbscan_external_record
+
+        test_ids = [ 5555, 0, 66, 21, 43, 11 ]
+        points = [ [ 5.2, 8.9, -6.1, 0.0 ], [ 0.0, -1.9, 5.6, 1.4 ],
+                   [ 0.0, 0.0, 0.0, 0.0  ], [-1.8,  0.0, 2.0, -1.3],
+                   [ 0.1, -0.1, 0.0, 0.5  ], [-1.0,  2.0, 2.1, -1.4] ]
+        # For reference, here is the table of distances between these test 
points:
+        #
+        # In [0]: for i in range(len(points)):
+        # ...:     print(np.linalg.norm(points - points [i], ord=2, axis=1))
+        # ...:
+        # [ 0.         16.80862874 11.97747887 13.98248905 12.01956738 
12.45993579]
+        # [16.80862874  0.          6.07700584  5.20576603  5.95147041  
6.02494813]
+        # [11.97747887  6.07700584  0.          2.98831056  0.51961524  
3.37194306]
+        # [13.98248905  5.20576603  2.98831056  0.          3.29545141  
2.15870331]
+        # [12.01956738  5.95147041  0.51961524  3.29545141  0.          
3.69323706]
+        # [12.45993579  6.02494813  3.37194306  2.15870331  3.69323706  0.     
   ]
+        #
+        #   By choosing eps=3.0, that means the first 2 points have 0 
neighbors:
+        #      id=5555 and id=0
+        #   The last two points each have 1 neighbor:
+        #      id=66 is the sole neighbor of id=43
+        #      id=21 is the sole neighbor of id=11
+        #   And the middle two each have 2 neighbors:
+        #      id=66 and id=21 each neighbor each other, plus the neighbor 
listed
+        #      above
+        #  If min_samples <= 3, then 66, 21, 43, and 11 form a cluster of 4 
points,
+        #      where all 4 should be labelled as core for min_samples = 2, but 
only
+        #      points 66 and 21 are core for min_samples=3.
+        #  For min_samples = 1, all 6 points are core, so there are 3 clusters
+        #      adding the 2 isolated single-point clusters to the 4-point 
cluster
+        #  For min_samples > 3 there are no core points, so all points are 
noise
+        #  
+
+        # Now we assign each point to one of 3 leaves:
+        leaf_ids = [ 1, 2, 6, 6, 1, 2 ]
+
+        # And also add it as an external point to a different leaf:
+        dist_ids = [ 6, 1, 1, 2, 2, 1 ]  # only used for external recs
+
+        # row_counts metadata for each dist_id:
+        #   dist_id=1: 2 internal points, 3 external ( from leaf_ids 2,6,2 )
+        #   dist id=2: 2 internal points, 2 external ( from leaf_ids 1,6 )
+        #   dist id=6: 2 internal points, 1 external ( from leaf_id 1 )
+        self.internal_rowcounts = Counter(leaf_ids)
+        self.external_rowcounts = Counter(dist_ids)
+
+        self.eps = 3.0
+        self.metric = 'dist_norm2'
+
+        db_recs = []
+        db_dicts = []
+        for i in range(len(test_ids)):
+            internal_rec = { 'id' : test_ids[i], 'point' : points[i], 
'leaf_id' : leaf_ids[i], 'dist_id' : leaf_ids[i] }
+            db_dicts.append(internal_rec)
+            db_recs.append(dbscan_record.from_dict(internal_rec, self.eps))
+            external_rec = { 'id' : test_ids[i], 'point' : points[i], 
'leaf_id' : leaf_ids[i] , 'dist_id' : dist_ids[i] }
+            db_dicts.append(external_rec)
+            db_recs.append(dbscan_record.from_dict(external_rec, self.eps))
+
+        self.test_ids = test_ids
+        self.points = points
+        self.leaf_ids = leaf_ids
+        self.dist_ids = dist_ids
+        self.db_dicts = db_dicts
+        self.db_recs = db_recs
 
     def tearDown(self):
         self.module_patcher.stop()
 
-    def test_validate_dbscan(self):
+    def test_dbscan_record(self):
+        dbscan_record = self.module.dbscan_record
+        dbscan_internal_record = self.module.dbscan_internal_record
+        dbscan_external_record = self.module.dbscan_external_record
+        label = self.module.label
+        db_recs = self.db_recs
+        db_dicts = self.db_dicts
+        test_ids = self.test_ids
+        points = self.points
+        leaf_ids = self.leaf_ids
+        dist_ids = self.dist_ids
+        eps = self.eps
+        n = len(test_ids)
+
+        # Test creating dbscan_records from dict input
+        j = 0
+        for i in range(n):
+            for t in ['internal', 'external']:
+                db_rec = db_recs[j]
+                self.assertEqual( db_rec.id, test_ids[i] )
+                self.assertEqual( db_rec.point, points[i] )
+                self.assertEqual( db_rec.leaf_id, leaf_ids[i] )
+                self.assertEqual( db_rec.cluster_id, label.UNLABELLED )
+                self.assertFalse( db_rec.is_core_point )
+                if isinstance(db_rec, dbscan_internal_record):
+                    self.assertEqual( db_rec.dist_id, leaf_ids[i] )
+                elif isinstance(db_rec, dbscan_external_record):
+                    self.assertEqual( db_rec.dist_id, dist_ids[i] )
+                else:
+                    assertEqual(0, 1 , "dbscan_record.from_dict() returned 
instance of unrecognized class {}".format(type(db_rec) ) )
+                j += 1
+
+        db_rec = dbscan_record.from_dict(db_recs[0].__dict__, eps)     # 0 
neighbors + 1 self
+        min_samples = 1                 #  For min_samples = 1, every point is 
a core point
+        self.assertFalse(db_rec.not_core_point(min_samples), "This should be a 
core point")
+        min_samples = 2                 #  For min_samples = 2, need at least 
1 neighbor to be core
+        self.assertTrue(db_rec.not_core_point(min_samples), "This should NOT 
be a core point")
+
+        min_samples = 6
+        db_rec._internal_neighbor_count = 4
+        db_rec._external_neighbors = []   #  4 + 0 neighbors + 1 self < 
min_samples=6
+        self.assertTrue(db_rec.not_core_point(min_samples), "This should be 
NOT be a core point")
+        db_rec._internal_neighbor_count = 2
+        db_rec._external_neighbors = [5, 8, 9]  # 2 + 3 = 5 neighbors + 1 for 
self >= min_samples=6
+        self.assertFalse(db_rec.not_core_point(min_samples), "This should be a 
core point")
+
+        # Test cloning of an internal record
+        #   (extra fields added should all have been deleted)
+        db_rec = dbscan_record(db_rec)
+        actual = db_dicts[0]
+        actual['cluster_id'] = 0
+        actual['is_core_point'] = True  # verify cloned as core point
+        self.assertEqual(db_rec.__dict__, actual)
+
+        # Test cloning of an external record
+        db_rec = dbscan_record(db_recs[1])
+        actual = db_dicts[1]
+        actual['cluster_id'] = 0
+        actual['is_core_point'] = False  # verify cloned as non-core point
+        self.assertEqual(db_rec.__dict__, actual)
+
+    def test_leaf_storage(self):
+        # Add some internal and external points, mark some as part of
+        #  clusters, test range queries and assert that:
+        #    1. the right points are returned
+        #    2. neighbor counts and possible_border_points are updated properly
+        #    3. search point has been removed from rtree
+        #
+        # For the purposes of testing LeafStorage, we ignore leaf_id &
+        #  dist_id, adding all 8 records to the same leaf.
+
+        eps = self.eps
+        min_samples = 3
+        metric = self.metric
+        db_recs = self.db_recs
+
+        db_rec = db_recs[0]
+        LeafStorage = self.module.LeafStorage
+
+        leaf = LeafStorage( 4,
+                            np.float64,
+                            eps,
+                            metric,
+                            min_samples
+                          )
+
+        for db_rec in db_recs:
+            leaf.add_point(db_rec)
+       
+        neighbor_counts = [ 1, 1, 3, 3, 2, 2 ]  # Number of neighbors 
(including self) of each of the
+                                                # 6 points for eps=3.0
+
+        returned_neighbors = [ [], [], [43, 21], [11], [], [] ]
+
+        core_points = { 21, 66, 43, 11 }     # Points expected to be marked 
core, with min_samples = 3
+                                             # Each point should be counted 
twice (1 internal + 1 external),
+                                             # so min_samples = 4 in this case 
means it has at least 1 actual
+                                             # neighbor plus itself
+
+        # Test calling range_query() on external points
+        #   External points must be queried first, so that all of the
+        #   _ext_neighbors lists get updated before we test the internal
+        #   points
+        for i in range(len(neighbor_counts)):
+            db_rec = db_recs[2*i+1]
+            neighbors = leaf.range_query(db_rec)
+            self.assertEqual( len(neighbors), neighbor_counts[i])
+
+        # Test calling range_query() on internal points
+        for i in range(len(neighbor_counts)):
+            db_rec = db_recs[2*i]
+            neighbors = leaf.range_query(db_rec)
+            self.assertEqual( len(db_rec._external_neighbors), 
neighbor_counts[i] )
+            self.assertEqual( db_rec._internal_neighbor_count, 
neighbor_counts[i] - 1 )
+            self.assertItemsEqual( neighbors, returned_neighbors[i],
+                "Expected range_query({}) to return neighbors {}, but returned 
{}".format(
+                    db_rec.id, returned_neighbors[i], neighbors
+                )
+            )
+            self.assertEqual( db_rec.is_core_point, db_rec.id in core_points,
+                "Unexpected is_core_point={} for internal point id={} after 
range_query()".format(
+                    db_rec.is_core_point, db_rec.id
+                )
+            )
+
+    # Helper function for test_dbscan_leaf()
+    #   Builds a dictionary of expected dbscan output records
+    #
+    #   Inputs:
+    #     ret_ids:      list of expected ids to be returned
+    #     cluster_ids:  list of expected cluster labels, in same order
+    #     external_flags: list of booleans indicating which records are 
external
+    #     core_points:  set of ids expected to be labeled as core points
+    #
+    #   Output:  A nested dictionary of output results indexed at first
+    #            level by point ids, and second level by 'internal' vs 
'external'
+    #
+    #            There should be at most 1 internal row returned for each 
internal id,
+    #            so expected_res[id]['internal'] is a single dbscan_record
+    #
+    #            Since dbscan_record should return multiple rows for some 
external points,
+    #            expected_res[id]['external'] is a list (stack) of the 
expected external
+    #            dbscan_records for each id, in reverse order of how they are 
expected
+    #            to be returned.  The 0th element of the list for each id will 
be None,
+    #            so that if the number of rows returned for any id exceeds the 
expected
+    #            number, the first unexpected row will be printed properly 
assertEquals,
+    #            instead of causing an Exception while trying to access an 
empty list
+    #            (after all other elements have been popped from the stack, as 
they are
+    #            compared).
+    #
+    def build_expected_dbscan_leaf_output(self, ret_ids, cluster_ids,
+                                          external_flags, core_points):
+        test_ids = self.test_ids
+        db_recs = self.db_recs
+        dbscan_record = self.module.dbscan_record
+        expected_res = {}
+        id_lookup = { test_ids[i] : i for i in range(len(test_ids)) }
+
+        for i, id in enumerate(ret_ids):
+            j = id_lookup[id]
+            if not id in expected_res:
+                entry = dict()
+                entry['external'] = [ None ]
+                entry['internal'] = None
+                expected_res[id] = entry
+
+            if external_flags[i]:
+                rec = dbscan_record(db_recs[2*j+1])  # start with original 
external rec
+                # Modify record with expected cluster_id
+                rec.cluster_id = cluster_ids[i]
+                rec.is_core_point = False  # external records should never be 
marked as core
+                entry['external'].append(rec)
+            else:
+                rec = dbscan_record(db_recs[2*j])  # start with original 
internal rec
+                # Modify record with expected cluster_id & is_core_point flag
+                rec.cluster_id = cluster_ids[i]
+                rec.is_core_point = id in core_points
+                entry['internal'] = rec
+
+        return expected_res
+
+    def test_dbscan_leaf(self):
+        # Test dbscan_leaf() by calling it with some simple test
+        # data.  Ensure that:
+        #     1. 1 record is returned for each non-noise point (especially, 
non-core points)
+        #        and a record for each cluster each external point is near.
+        #        eg. if 1 external point is near 2 clusters, and another is
+        #        near 1 cluster, then 3 records should be returned (+ internal 
records)
+        #     2. SD is properly removed if an exception is hit, or if last row
+        #     3. External neighbors are included in deciding whether each 
internal point
+        #        is a core_point.  ie. If an external neighbor brings the 
total # of
+        #        neighbors (including self) for an internal point up to 
min_samples, it
+        #        should be marked a core point.
+        #     4. cluster_id's are assigned correctly, and leaf_id's & 
dist_id's are unchanged
+
+        self.maxDiff = 2000
+        dbscan_record = self.module.dbscan_record
+        eps = self.eps
+        metric = self.metric
+        min_samples = 1
+
+        db_recs = self.db_recs
+        SD = dict()
+
+        # Call dbscan_leaf() with eps=3.0
+        res = []
+        for db_rec in db_recs:
+            num_internal_points = self.internal_rowcounts[db_rec.dist_id]
+            num_external_points = self.external_rowcounts[db_rec.dist_id]
+            state = self.module._dbscan_leaf(db_rec.__dict__, eps, 
min_samples, metric, num_internal_points,
+                                             num_external_points, SD)
+            res += list(state)
+
+        self.assertEqual(SD, {}, "SD was not properly cleaned up after calls 
to dbscan_leaf() complete")
+
+        # Expected results for eps=3.0, min_samples=1
+        ret_ids =        [ 5555, 0, 66, 66, 21, 21, 43, 11 ]
+        cluster_ids =    [    2, 1,  1,  1,  1,  2, 1,  2 ]
+        external_flags = [ False, False, False, True, False, True, False, 
False ]
+        core_points = { 5555, 0, 66, 21, 43, 11 } #  All internal points 
should be marked core
+        expected_res = self.build_expected_dbscan_leaf_output(ret_ids, 
cluster_ids, external_flags,
+                                                              core_points)
+        for output_rec in res:
+            if output_rec.leaf_id == output_rec.dist_id:
+                self.assertEqual(output_rec, 
expected_res[output_rec.id]['internal'])
+            else:
+                self.assertEqual(output_rec, 
expected_res[output_rec.id]['external'].pop())
+
+        self.assertEqual(len(res), len(ret_ids),
+            "All {} expected rows match rows returned by dbscan_leaf(), but 
also got {} unexpected rows!".
+            format(len(ret_ids), len(res)))
+
+        eps=5.5  # Call dbscan_leaf() again with different eps
+        res = []
+        for db_rec in db_recs:
+            num_internal_points = self.internal_rowcounts[db_rec.dist_id]
+            num_external_points = self.external_rowcounts[db_rec.dist_id]
+            state = self.module._dbscan_leaf(db_rec.__dict__, eps, 
min_samples, metric, num_internal_points,
+                                             num_external_points, SD)
+            res += list(state)
 
-        self.module.input_tbl_valid = Mock()
-        self.module.output_tbl_valid = Mock()
-        self.module.cols_in_tbl_valid = Mock()
-        self.module.is_var_valid = Mock(return_value = True)
-        self.module.get_expr_type = Mock(return_value = 'double precision[]')
+        self.assertEqual(SD, {}, "SD was not properly cleaned up after calls 
to dbscan_leaf() complete")
 
-        self.module._validate_dbscan(**self.default_dict)
+        # Expected results for eps=5.5, min_samples=1
+        ret_ids =  [ 5555, 0, 66, 66, 21, 21, 21, 43, 43, 11, 11 ]
+        cluster_ids = [ 2, 1,  1,  1,  1,  2,  1,  1,  2,  2,  1 ]
+        external_flags = [ False, False, False, True, False, True, True, 
False, True, False, True ]
+        core_points = { 5555, 0, 66, 21, 43, 11 }
+        expected_res = self.build_expected_dbscan_leaf_output(ret_ids, 
cluster_ids, external_flags,
+                                                              core_points)
+        for output_rec in res:
+            if output_rec.leaf_id == output_rec.dist_id:
+                self.assertEqual(output_rec, 
expected_res[output_rec.id]['internal'])
+            else:
+                self.assertEqual(output_rec, 
expected_res[output_rec.id]['external'].pop())
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to