reductionista commented on a change in pull request #505:
URL: https://github.com/apache/madlib/pull/505#discussion_r461300564



##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+    from rtree.index import Rtree

Review comment:
       I don't see `Rtree` used anywhere, I think we can get rid of this import.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, 
source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, 
n_rows, leaf_id, **kwargs):
+
+    SD = kwargs['SD']
+    if not state:
+        data = {}
+        SD['counter{0}'.format(leaf_id)] = 0
+    else:
+        data = SD['data{0}'.format(leaf_id)]
+
+    data[id_in] = expr_points
+    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+    SD['data{0}'.format(leaf_id)] = data
+    ret = [[-1,-1],[-1,-1]]
+
+    my_n_rows = n_rows[leaf_id]
+
+    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+        core_counts = {}
+        core_lists = {}
+        p = index.Property()
+        p.dimension = len(expr_points)
+        idx = index.Index(properties=p)
+        ret = []
+
+        if metric == 'dist_norm1':
+            fn_dist = distance.cityblock
+        elif metric == 'dist_norm2':
+            fn_dist = distance.euclidean
+        else:
+            fn_dist = distance.sqeuclidean
+
+        for key1, value1 in data.items():
+            idx.add(key1,value1+value1,key1)
+
+        for key1, value1 in data.items():
+
+            v1 = []
+            v2 = []
+            for dim in value1:

Review comment:
       `dim` sounds more like the size of something; I'd go with `x`, sounds 
more like a coordinate.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, 
source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, 
n_rows, leaf_id, **kwargs):
+
+    SD = kwargs['SD']
+    if not state:
+        data = {}
+        SD['counter{0}'.format(leaf_id)] = 0
+    else:
+        data = SD['data{0}'.format(leaf_id)]
+
+    data[id_in] = expr_points
+    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+    SD['data{0}'.format(leaf_id)] = data
+    ret = [[-1,-1],[-1,-1]]
+
+    my_n_rows = n_rows[leaf_id]
+
+    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+        core_counts = {}
+        core_lists = {}
+        p = index.Property()
+        p.dimension = len(expr_points)
+        idx = index.Index(properties=p)
+        ret = []
+
+        if metric == 'dist_norm1':
+            fn_dist = distance.cityblock
+        elif metric == 'dist_norm2':
+            fn_dist = distance.euclidean
+        else:
+            fn_dist = distance.sqeuclidean
+
+        for key1, value1 in data.items():
+            idx.add(key1,value1+value1,key1)

Review comment:
       Maybe move this outside of the `if`?  May as well build the rtree row by 
row as they come in.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, 
source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, 
n_rows, leaf_id, **kwargs):
+
+    SD = kwargs['SD']
+    if not state:
+        data = {}
+        SD['counter{0}'.format(leaf_id)] = 0
+    else:
+        data = SD['data{0}'.format(leaf_id)]
+
+    data[id_in] = expr_points
+    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+    SD['data{0}'.format(leaf_id)] = data
+    ret = [[-1,-1],[-1,-1]]
+
+    my_n_rows = n_rows[leaf_id]
+
+    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+        core_counts = {}
+        core_lists = {}
+        p = index.Property()
+        p.dimension = len(expr_points)

Review comment:
       I wish we could call this variable `point` rather than `expr_points`, as 
it's an array of numbers representing a single point rather than an expression 
or a list of multiple points or expressions.  But I guess this is due to the 
sql interface to `rtree_step()`, where it does get passed a column of points or 
an expression of columns.
   
   Maybe we can change the name to `point` just for the transition function, 
and keep it as-is for the aggregate function?  Or would that be too confusing 
having them named differently?  I can't recall if we've done that elsewhere, 
but I think so.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, 
source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, 
n_rows, leaf_id, **kwargs):

Review comment:
       Some general comments about the `rtree_transition()` function:
   
   This is functionally equivalent to running the dbscan algorithm on one 
leaf's worth of points.  But it uses a different algorithm in that, it explores 
the points in a random order instead of a systematic order (moving from one 
neighbor to the next in a breadth-first-search). 
   
   It seems like they would have similar runtime behavior in a typical case, 
but I'm a bit worried there may be situations where they perform very 
differently.
   
   Somewhat related to that:  it seems like it would be better to prune the 
edges we won't need later now, rather than later, so we have less data to send 
back to master.  Maybe there's a reason I'm missing why this aggregate function 
and the snowflake aggregate function have to be separate, but I think they 
ought to be combined if possible.  Being able to do lookups in the rtree might 
also help the snowflake stage run faster.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -175,9 +383,113 @@ def dbscan(schema_madlib, source_table, output_table, 
id_column, expr_point, eps
             """.format(**locals())
         plpy.execute(sql)
 
-        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format(
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, 
{5}".format(
                      distance_table, core_points_table, core_edge_table,
-                     reachable_points_table))
+                     reachable_points_table, sf_step_table, sf_edge_table))
+
+def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps,
+              min_samples, metric, depth):
+
+    n_features = num_features(source_table, expr_point)
+
+    # If squared_dist_norm2 is used, we assume eps is set for the squared 
distance
+    # That means the border only needs to be sqrt(eps) wide
+    local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
+
+    kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree(
+        schema_madlib, source_table, expr_point, depth, n_features, local_eps)
+
+    kd_source_table = unique_string(desp='kd_source_table')
+    kd_border_table1 = unique_string(desp='kd_border_table1')
+    kd_border_table2 = unique_string(desp='kd_border_table2')
+
+    dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)'
+    plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table, 
kd_border_table1, kd_border_table2))
+
+    output_sql = """
+        CREATE TABLE {kd_source_table} AS
+            SELECT *,
+                   CASE {case_when_clause} END AS __leaf_id__
+            FROM {source_table}
+            {dist_leaf_sql}
+        """.format(**locals())
+    plpy.execute(output_sql)
+
+    border_sql = """
+        CREATE TABLE {kd_border_table1} AS
+            SELECT *
+            FROM {source_table}
+            WHERE {border_cl1}
+        """.format(**locals())
+    plpy.execute(border_sql)
+
+    border_sql = """
+        CREATE TABLE {kd_border_table2} AS
+            SELECT *
+            FROM {source_table}
+            WHERE {border_cl2}
+        """.format(**locals())
+    plpy.execute(border_sql)
+
+    return kd_source_table, kd_border_table1, kd_border_table2
+
+
+def build_kd_tree(schema_madlib, source_table, expr_point,
+                  depth, n_features, eps, **kwargs):
+    """
+        KD-tree function to create a partitioning for KNN
+        Args:
+            @param schema_madlib        Name of the Madlib Schema
+            @param source_table         Training data table
+            @param output_table         Name of the table to store kd tree
+            @param expr_point    Name of the column with training data
+                                        or expression that evaluates to a
+                                        numeric array
+            @param depth                Depth of the kd tree
+    """
+    with MinWarning("error"):
+
+        n_features = num_features(source_table, expr_point)
+
+        clauses = [' 1=1 ']
+        border_cl1 = ' 1!=1 '
+        border_cl2 = ' 1!=1 '

Review comment:
       I have a slight preference for using `True` and `False` instead of `1=1` 
and `1!=1`.  Either way is fine I guess.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+    from rtree.index import Rtree
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth

Review comment:
       I suggest we should remove depth as a parameter, and instead set:
   ```
   depth = int(log_2 (num_segments))+1
   ```
   ie, the number of leaves should be the next power of 2 above number of 
segments.
   
   I can't think of a situation where it would be better to set it to something 
else.  We want the number of leaves to match the number of segments as closely 
as possible, to reduce extra work in merging the leaves together later, and 
require fewer rtrees... but it can't be less than the number of segments 
otherwise we won't be able to use all the segments in parallel, some will have 
to sit idle.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -175,9 +383,113 @@ def dbscan(schema_madlib, source_table, output_table, 
id_column, expr_point, eps
             """.format(**locals())
         plpy.execute(sql)
 
-        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format(
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, 
{5}".format(
                      distance_table, core_points_table, core_edge_table,
-                     reachable_points_table))
+                     reachable_points_table, sf_step_table, sf_edge_table))
+
+def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps,
+              min_samples, metric, depth):
+
+    n_features = num_features(source_table, expr_point)
+
+    # If squared_dist_norm2 is used, we assume eps is set for the squared 
distance
+    # That means the border only needs to be sqrt(eps) wide
+    local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
+
+    kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree(
+        schema_madlib, source_table, expr_point, depth, n_features, local_eps)
+
+    kd_source_table = unique_string(desp='kd_source_table')
+    kd_border_table1 = unique_string(desp='kd_border_table1')
+    kd_border_table2 = unique_string(desp='kd_border_table2')
+
+    dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)'
+    plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table, 
kd_border_table1, kd_border_table2))
+
+    output_sql = """
+        CREATE TABLE {kd_source_table} AS
+            SELECT *,
+                   CASE {case_when_clause} END AS __leaf_id__
+            FROM {source_table}
+            {dist_leaf_sql}
+        """.format(**locals())
+    plpy.execute(output_sql)
+
+    border_sql = """
+        CREATE TABLE {kd_border_table1} AS
+            SELECT *
+            FROM {source_table}
+            WHERE {border_cl1}
+        """.format(**locals())
+    plpy.execute(border_sql)
+
+    border_sql = """
+        CREATE TABLE {kd_border_table2} AS
+            SELECT *
+            FROM {source_table}
+            WHERE {border_cl2}
+        """.format(**locals())
+    plpy.execute(border_sql)
+
+    return kd_source_table, kd_border_table1, kd_border_table2
+
+
+def build_kd_tree(schema_madlib, source_table, expr_point,
+                  depth, n_features, eps, **kwargs):
+    """
+        KD-tree function to create a partitioning for KNN

Review comment:
       Update comment about kNN

##########
File path: src/ports/postgres/modules/dbscan/dbscan.sql_in
##########
@@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict(
     PythonFunction(dbscan, dbscan, dbscan_predict_help)
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
+    state1          INTEGER[][],
+    state2          INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
+    state INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
+    state                       INTEGER[],
+    src                         BIGINT,
+    dest                        BIGINT,
+    n_rows                      INTEGER[],
+    gp_segment_id               INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step(
+    BIGINT,
+    BIGINT,
+    INTEGER[],
+    INTEGER);
+CREATE AGGREGATE MADLIB_SCHEMA.sf_step(
+    BIGINT,
+    BIGINT,
+    INTEGER[],
+    INTEGER
+)(
+    STYPE=INTEGER[][],
+    SFUNC=MADLIB_SCHEMA.sf_transition,
+    m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
+    FINALFUNC=MADLIB_SCHEMA.sf_final
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition(
+    state                       INTEGER[],
+    id_in                         BIGINT,
+    expr_points DOUBLE PRECISION[],
+    eps     DOUBLE PRECISION,
+    min_samples INTEGER,
+    metric  VARCHAR,
+    n_rows  INTEGER[],
+    leaf_id     INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge(
+    state1          INTEGER[][],
+    state2          INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final(
+    state INTEGER[][]
+) RETURNS INTEGER[] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step(

Review comment:
       As I understand it, for SGD the aggregate function is named 
`madlib_keras_step()` because it's a single step in the long series of steps 
taken in the gradient descent process, ie it's called in a for loop many times. 
 But for the aggregate that builds the rtree, we only call it once--so 
including `_step()` in the name seems less appropriate here.  (Same goes for 
`sf_step()` function.)
   
   Also, `rtree_step` doesn't indicate the purpose of the function, just the 
data structure used to implement it.  It took me a while reading through the 
code to figure out what this function does and how it fits into the rest. 
    I think naming it something like`find_core_points()` would increase 
readability a lot.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+    from rtree.index import Rtree
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, 
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, 
depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY 
(__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_table, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+            sql = """
+                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY 
__leaf_id__
+                """.format(**locals())
+            result = plpy.execute(sql)
+            rt_counts_dict = {}
+            for i in result:
+                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+            rt_counts_list = []
+            for i in sorted(rt_counts_dict):
+                rt_counts_list.append(rt_counts_dict[i])
+
+            leaf_id_start = pow(2,depth)-1
+
+            rtree_step_table = unique_string(desp='rtree_step_table')
+            rt_edge_table = unique_string(desp='rt_edge_table')
+            rt_core_points_table = unique_string(desp='rt_core_points_table')
+            border_core_points_table = 
unique_string(desp='border_core_points_table')
+            border_edge_table = unique_string(desp='border_edge_table')
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
+                rtree_step_table, rt_edge_table, rt_core_points_table,
+                border_core_points_table, border_edge_table))
+
+            sql = """
+            CREATE TABLE {rtree_step_table} AS
+            SELECT __leaf_id__,
+                   {schema_madlib}.rtree_step( {id_column},
+                                               {expr_point},
+                                               {eps},
+                                               {min_samples},
+                                               '{metric}',
+                                               ARRAY{rt_counts_list},
+                                               __leaf_id__
+                                               )
+            FROM {cur_source_table} GROUP BY __leaf_id__

Review comment:
       I recommend we GROUP BY `gp_segment_id` instead of `__leaf_id__` here.  
We can add all the points in each segment to the same rtree, that will mean 
fewer rtrees and less to merge later.  In some cases, it may not help, but if 
the regions within a single segment happen to be near each other or 
overlapping, it could help a lot... especially if we let the user control the 
depth (as they might set it so there are many leaves in each segment.
   
   We might even be able to remove the GROUP BY entirely, but that would 
require changing the merge function a bit.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.sql_in
##########
@@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict(
     PythonFunction(dbscan, dbscan, dbscan_predict_help)
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
+    state1          INTEGER[][],
+    state2          INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
+    state INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
+    state                       INTEGER[],
+    src                         BIGINT,
+    dest                        BIGINT,
+    n_rows                      INTEGER[],
+    gp_segment_id               INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step(
+    BIGINT,
+    BIGINT,
+    INTEGER[],
+    INTEGER);
+CREATE AGGREGATE MADLIB_SCHEMA.sf_step(
+    BIGINT,
+    BIGINT,
+    INTEGER[],
+    INTEGER
+)(
+    STYPE=INTEGER[][],
+    SFUNC=MADLIB_SCHEMA.sf_transition,
+    m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
+    FINALFUNC=MADLIB_SCHEMA.sf_final
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition(
+    state                       INTEGER[],
+    id_in                         BIGINT,
+    expr_points DOUBLE PRECISION[],
+    eps     DOUBLE PRECISION,
+    min_samples INTEGER,
+    metric  VARCHAR,
+    n_rows  INTEGER[],
+    leaf_id     INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge(
+    state1          INTEGER[][],
+    state2          INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final(
+    state INTEGER[][]
+) RETURNS INTEGER[] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step(

Review comment:
       As I understand it, for SGD the aggregate function is named 
`madlib_keras_step()` because it's a single step in the long series of steps 
taken in the gradient descent process, ie it's called in a for loop many times. 
 But for the aggregate that builds the rtree, we only call it once--so 
including `_step()` in the name seems less appropriate here.  (Same goes for 
`sf_step()` function.)
   
   Also, `rtree_step` doesn't indicate the purpose of the function, just the 
data structure used to implement it.  It took me a while reading through the 
code to figure out what this function does and how it fits into the rest. 
    I think naming it something like`find_core_points()`or `build_edge_table` 
would increase readability a lot.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.sql_in
##########
@@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict(
     PythonFunction(dbscan, dbscan, dbscan_predict_help)
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
+    state1          INTEGER[][],
+    state2          INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
+    state INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
+    state                       INTEGER[],
+    src                         BIGINT,
+    dest                        BIGINT,
+    n_rows                      INTEGER[],
+    gp_segment_id               INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.sf_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step(
+    BIGINT,
+    BIGINT,
+    INTEGER[],
+    INTEGER);
+CREATE AGGREGATE MADLIB_SCHEMA.sf_step(
+    BIGINT,
+    BIGINT,
+    INTEGER[],
+    INTEGER
+)(
+    STYPE=INTEGER[][],
+    SFUNC=MADLIB_SCHEMA.sf_transition,
+    m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
+    FINALFUNC=MADLIB_SCHEMA.sf_final
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition(
+    state                       INTEGER[],
+    id_in                         BIGINT,
+    expr_points DOUBLE PRECISION[],
+    eps     DOUBLE PRECISION,
+    min_samples INTEGER,
+    metric  VARCHAR,
+    n_rows  INTEGER[],
+    leaf_id     INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge(
+    state1          INTEGER[][],
+    state2          INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final(
+    state INTEGER[][]
+) RETURNS INTEGER[] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+    return dbscan.rtree_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step(

Review comment:
       As I understand it, for SGD the aggregate function is named 
`madlib_keras_step()` because it's a single step in the long series of steps 
taken in the gradient descent process, ie it's called in a for loop many times. 
 But for the aggregate that builds the rtree, we only call it once--so 
including `_step()` in the name seems less appropriate here.  (Same goes for 
`sf_step()` function.)
   
   Also, `rtree_step` doesn't indicate the purpose of the function, just the 
data structure used to implement it.  It took me a while reading through the 
code to figure out what this function does and how it fits into the rest. 
    I think naming it something like`find_core_points()`or `build_edge_table()` 
would increase readability a lot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to