reductionista commented on a change in pull request #505:
URL: https://github.com/apache/madlib/pull/505#discussion_r462433675



##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,297 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, 
depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
 
+        source_view = unique_string(desp='source_view')
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
         sql = """
-            CREATE TABLE {distance_table} AS
-            SELECT __src__, __dest__ FROM (
-                SELECT  __t1__.{id_column} AS __src__,
-                        __t2__.{id_column} AS __dest__,
-                        {schema_madlib}.{metric}(
-                            __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
-                FROM {source_table} AS __t1__, {source_table} AS __t2__
-                WHERE __t1__.{id_column} != __t2__.{id_column}) q1
-            WHERE __dist__ < {eps}
-            {dist_src_sql}
+            CREATE VIEW {source_view} AS
+            SELECT {id_column}, {expr_point} AS __expr_point__
+            FROM {source_table}
             """.format(**locals())
         plpy.execute(sql)
+        expr_point = '__expr_point__'
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_view, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+            sql = """
+                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY 
__leaf_id__
+                """.format(**locals())
+            result = plpy.execute(sql)
+            rt_counts_dict = {}
+            for i in result:
+                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+            rt_counts_list = []
+            for i in sorted(rt_counts_dict):
+                rt_counts_list.append(rt_counts_dict[i])
+
+            leaf_id_start = pow(2,depth)-1
+
+            find_core_points_table = 
unique_string(desp='find_core_points_table')
+            rt_edge_table = unique_string(desp='rt_edge_table')
+            rt_core_points_table = unique_string(desp='rt_core_points_table')
+            border_core_points_table = 
unique_string(desp='border_core_points_table')
+            border_edge_table = unique_string(desp='border_edge_table')
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
+                find_core_points_table, rt_edge_table, rt_core_points_table,
+                border_core_points_table, border_edge_table))
+
+            sql = """
+            CREATE TABLE {find_core_points_table} AS
+            SELECT __leaf_id__,
+                   {schema_madlib}.find_core_points( {id_column},
+                                               {expr_point}::DOUBLE 
PRECISION[],
+                                               {eps},
+                                               {min_samples},
+                                               '{metric}',
+                                               ARRAY{rt_counts_list},
+                                               __leaf_id__
+                                               )
+            FROM {cur_source_table} GROUP BY __leaf_id__
+            {dist_leaf_sql}
+            """.format(**locals())
+            plpy.execute(sql)
+
+            sql = """
+            CREATE TABLE {rt_edge_table} AS
+            SELECT (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__
+            FROM (
+                SELECT {schema_madlib}.unpack_2d(find_core_points) AS 
unpacked_2d
+                FROM {find_core_points_table}
+                ) q1
+            WHERE (unpacked_2d).src NOT IN (SELECT {id_column} FROM 
{border_table1})
+            {dist_src_sql}
+            """.format(**locals())
+            plpy.execute(sql)
+
+            sql = """
+                CREATE TABLE {rt_core_points_table} AS
+                SELECT DISTINCT(__src__) AS {id_column} FROM {rt_edge_table}
+                """.format(**locals())
+            plpy.execute(sql)
+
+            # # Start border
+            sql = """
+                CREATE TABLE {border_edge_table} AS
+                SELECT __src__, __dest__ FROM (
+                    SELECT  __t1__.{id_column} AS __src__,
+                            __t2__.{id_column} AS __dest__,
+                            {schema_madlib}.{metric}(
+                                __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
+                    FROM {border_table1} AS __t1__, {border_table2} AS 
__t2__)q1
+                WHERE __dist__ < {eps}
+                """.format(**locals())
+            plpy.execute(sql)

Review comment:
       This query is an O(B^2) operation, where B is the number of boundary 
points.  Same as brute force algorithm, but with N replaced by B.  At first I 
thought this wouldn't be too bad, because it seems like B would be very small 
compared to N.  Unfortunately, this reasoning only works well for a low number 
of dimensions.  As the # of dimensions gets higher, the hypervolume of the 
boundary is not that different from the overall hypervolume.
   Here's a calculation of the runtime... I think this is the bottleneck for 
this whole algorithm.
   
   D = dimension of space
   N = number of points
   S = number of segments
   eps = epsilon
   
   Hypervolume of space containing all points = V = L^D
   Hypervolume of boundaries = O(eps * L^(D-1))
   Hypervolume of boundaries between boundaries = O(eps^2 * L^(D-2))
   ...
   
   Avg density of points = n = N/V = N/L^D
   
   Points in boundary regions = O(eps*n*L^(D-1)) = (eps/L)*O(N)
   
   Current runtime:  (1/S)*(eps/L)^2*O(N)^2
   
   The goal should be able to do this in  O(N/S*log(N)).  I think we can 
achieve this if we re-run the `find_core_points()` AGGREGATE on the boundary 
points, then on the boundaries of the boundaries, etc. until we reach 1D 
boundaries.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,297 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, 
depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
 
+        source_view = unique_string(desp='source_view')
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
         sql = """
-            CREATE TABLE {distance_table} AS
-            SELECT __src__, __dest__ FROM (
-                SELECT  __t1__.{id_column} AS __src__,
-                        __t2__.{id_column} AS __dest__,
-                        {schema_madlib}.{metric}(
-                            __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
-                FROM {source_table} AS __t1__, {source_table} AS __t2__
-                WHERE __t1__.{id_column} != __t2__.{id_column}) q1
-            WHERE __dist__ < {eps}
-            {dist_src_sql}
+            CREATE VIEW {source_view} AS
+            SELECT {id_column}, {expr_point} AS __expr_point__
+            FROM {source_table}
             """.format(**locals())
         plpy.execute(sql)
+        expr_point = '__expr_point__'
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_view, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "

Review comment:
       I think we can remove this line, `kd_join_clause` is unused

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,297 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, 
depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
 
+        source_view = unique_string(desp='source_view')
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
         sql = """
-            CREATE TABLE {distance_table} AS
-            SELECT __src__, __dest__ FROM (
-                SELECT  __t1__.{id_column} AS __src__,
-                        __t2__.{id_column} AS __dest__,
-                        {schema_madlib}.{metric}(
-                            __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
-                FROM {source_table} AS __t1__, {source_table} AS __t2__
-                WHERE __t1__.{id_column} != __t2__.{id_column}) q1
-            WHERE __dist__ < {eps}
-            {dist_src_sql}
+            CREATE VIEW {source_view} AS
+            SELECT {id_column}, {expr_point} AS __expr_point__
+            FROM {source_table}
             """.format(**locals())
         plpy.execute(sql)
+        expr_point = '__expr_point__'
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_view, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+            sql = """
+                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY 
__leaf_id__
+                """.format(**locals())
+            result = plpy.execute(sql)
+            rt_counts_dict = {}
+            for i in result:
+                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+            rt_counts_list = []
+            for i in sorted(rt_counts_dict):
+                rt_counts_list.append(rt_counts_dict[i])
+
+            leaf_id_start = pow(2,depth)-1
+
+            find_core_points_table = 
unique_string(desp='find_core_points_table')
+            rt_edge_table = unique_string(desp='rt_edge_table')
+            rt_core_points_table = unique_string(desp='rt_core_points_table')
+            border_core_points_table = 
unique_string(desp='border_core_points_table')
+            border_edge_table = unique_string(desp='border_edge_table')
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
+                find_core_points_table, rt_edge_table, rt_core_points_table,
+                border_core_points_table, border_edge_table))
+
+            sql = """
+            CREATE TABLE {find_core_points_table} AS
+            SELECT __leaf_id__,
+                   {schema_madlib}.find_core_points( {id_column},
+                                               {expr_point}::DOUBLE 
PRECISION[],
+                                               {eps},
+                                               {min_samples},
+                                               '{metric}',
+                                               ARRAY{rt_counts_list},
+                                               __leaf_id__
+                                               )
+            FROM {cur_source_table} GROUP BY __leaf_id__
+            {dist_leaf_sql}
+            """.format(**locals())
+            plpy.execute(sql)
+
+            sql = """
+            CREATE TABLE {rt_edge_table} AS
+            SELECT (unpacked_2d).src AS __src__, (unpacked_2d).dest AS __dest__
+            FROM (
+                SELECT {schema_madlib}.unpack_2d(find_core_points) AS 
unpacked_2d
+                FROM {find_core_points_table}
+                ) q1
+            WHERE (unpacked_2d).src NOT IN (SELECT {id_column} FROM 
{border_table1})
+            {dist_src_sql}
+            """.format(**locals())
+            plpy.execute(sql)
+
+            sql = """
+                CREATE TABLE {rt_core_points_table} AS
+                SELECT DISTINCT(__src__) AS {id_column} FROM {rt_edge_table}
+                """.format(**locals())
+            plpy.execute(sql)
+
+            # # Start border
+            sql = """
+                CREATE TABLE {border_edge_table} AS
+                SELECT __src__, __dest__ FROM (
+                    SELECT  __t1__.{id_column} AS __src__,
+                            __t2__.{id_column} AS __dest__,
+                            {schema_madlib}.{metric}(
+                                __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
+                    FROM {border_table1} AS __t1__, {border_table2} AS 
__t2__)q1
+                WHERE __dist__ < {eps}
+                """.format(**locals())
+            plpy.execute(sql)
+
+            sql = """
+                CREATE TABLE {border_core_points_table} AS
+                SELECT * FROM (
+                    SELECT __src__ AS {id_column}, count(*) AS __count__
+                    FROM {border_edge_table} GROUP BY __src__) q1
+                WHERE __count__ >= {min_samples}
+                {dist_id_sql}

Review comment:
       The HAVING keyword is for situations like this, looks a bit cleaner, and 
I think may also reduce the number of slices needed by 1:
   ```
                   CREATE TABLE {border_core_points_table} AS
                       SELECT __src__ AS {id_column}, count(*) AS __count__
                       FROM {border_edge_table} GROUP BY __src__
                       HAVING __count__ >= {min_samples}
                   {dist_id_sql}
   ```
   (same for some other similar queries, but not sure how many of them will 
still look the same after other things get refactored)

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,297 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, 
depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
 
+        source_view = unique_string(desp='source_view')
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
         sql = """
-            CREATE TABLE {distance_table} AS
-            SELECT __src__, __dest__ FROM (
-                SELECT  __t1__.{id_column} AS __src__,
-                        __t2__.{id_column} AS __dest__,
-                        {schema_madlib}.{metric}(
-                            __t1__.{expr_point}, __t2__.{expr_point}) AS 
__dist__
-                FROM {source_table} AS __t1__, {source_table} AS __t2__
-                WHERE __t1__.{id_column} != __t2__.{id_column}) q1
-            WHERE __dist__ < {eps}
-            {dist_src_sql}
+            CREATE VIEW {source_view} AS
+            SELECT {id_column}, {expr_point} AS __expr_point__
+            FROM {source_table}
             """.format(**locals())
         plpy.execute(sql)
+        expr_point = '__expr_point__'
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_view, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+            sql = """
+                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY 
__leaf_id__
+                """.format(**locals())
+            result = plpy.execute(sql)
+            rt_counts_dict = {}
+            for i in result:
+                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+            rt_counts_list = []
+            for i in sorted(rt_counts_dict):
+                rt_counts_list.append(rt_counts_dict[i])
+
+            leaf_id_start = pow(2,depth)-1

Review comment:
       `leaf_id_start` also unused

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+    from rtree.index import Rtree
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, 
depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_table, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+            sql = """
+                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY 
__leaf_id__
+                """.format(**locals())
+            result = plpy.execute(sql)
+            rt_counts_dict = {}
+            for i in result:
+                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+            rt_counts_list = []
+            for i in sorted(rt_counts_dict):
+                rt_counts_list.append(rt_counts_dict[i])
+
+            leaf_id_start = pow(2,depth)-1
+
+            rtree_step_table = unique_string(desp='rtree_step_table')
+            rt_edge_table = unique_string(desp='rt_edge_table')
+            rt_core_points_table = unique_string(desp='rt_core_points_table')
+            border_core_points_table = 
unique_string(desp='border_core_points_table')
+            border_edge_table = unique_string(desp='border_edge_table')
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
+                rtree_step_table, rt_edge_table, rt_core_points_table,
+                border_core_points_table, border_edge_table))
+
+            sql = """
+            CREATE TABLE {rtree_step_table} AS
+            SELECT __leaf_id__,
+                   {schema_madlib}.rtree_step( {id_column},
+                                               {expr_point},
+                                               {eps},
+                                               {min_samples},
+                                               '{metric}',
+                                               ARRAY{rt_counts_list},
+                                               __leaf_id__
+                                               )
+            FROM {cur_source_table} GROUP BY __leaf_id__

Review comment:
       I think as long as the leaves are all subsets of the segments, this 
should be okay.  We're just pre-calculating some of the distances with the 
kd-tree that we would have calculated later when the boundary edge table is 
constructed.  I think the kd-tree would still be useful, we'd just be shifting 
more of the work into the `find_core_points()` aggregate, saving work later.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, 
source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, 
n_rows, leaf_id, **kwargs):
+
+    SD = kwargs['SD']
+    if not state:
+        data = {}
+        SD['counter{0}'.format(leaf_id)] = 0
+    else:
+        data = SD['data{0}'.format(leaf_id)]
+
+    data[id_in] = expr_points
+    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+    SD['data{0}'.format(leaf_id)] = data
+    ret = [[-1,-1],[-1,-1]]
+
+    my_n_rows = n_rows[leaf_id]
+
+    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+        core_counts = {}
+        core_lists = {}
+        p = index.Property()
+        p.dimension = len(expr_points)
+        idx = index.Index(properties=p)
+        ret = []
+
+        if metric == 'dist_norm1':
+            fn_dist = distance.cityblock
+        elif metric == 'dist_norm2':
+            fn_dist = distance.euclidean
+        else:
+            fn_dist = distance.sqeuclidean
+
+        for key1, value1 in data.items():
+            idx.add(key1,value1+value1,key1)

Review comment:
       We already save the points in SD between each row.  This would just be 
saving it as a kdtree data structure instead of a list data structure.  (Or if 
we also need the list, we could save both.). No pickling necessary since it 
never gets transferred between segments or written out to disk.
   
   However--after reading this "streaming optimization" for rtree:
   https://rtree.readthedocs.io/en/latest/performance.html#use-stream-loading
   
   I'm wondering if bulk loading is already optimized inside rtree--if that's 
the case, then it's probably better to just leave it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to