reductionista commented on a change in pull request #505:
URL: https://github.com/apache/madlib/pull/505#discussion_r461300564
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
from utilities.utilities import NUMERIC, ONLY_ARRAY
from utilities.utilities import is_valid_psql_type
from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
from utilities.validate_args import input_tbl_valid, output_tbl_valid
from utilities.validate_args import is_var_valid
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import get_expr_type
from utilities.validate_args import get_algorithm_name
from graph.wcc import wcc
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+ from rtree import index
+ from rtree.index import Rtree
Review comment:
I don't see `Rtree` used anywhere, I think we can get rid of this import.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table,
source_table, id_column,
""".format(**locals())
result = plpy.execute(sql)
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric,
n_rows, leaf_id, **kwargs):
+
+ SD = kwargs['SD']
+ if not state:
+ data = {}
+ SD['counter{0}'.format(leaf_id)] = 0
+ else:
+ data = SD['data{0}'.format(leaf_id)]
+
+ data[id_in] = expr_points
+ SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+ SD['data{0}'.format(leaf_id)] = data
+ ret = [[-1,-1],[-1,-1]]
+
+ my_n_rows = n_rows[leaf_id]
+
+ if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+ core_counts = {}
+ core_lists = {}
+ p = index.Property()
+ p.dimension = len(expr_points)
+ idx = index.Index(properties=p)
+ ret = []
+
+ if metric == 'dist_norm1':
+ fn_dist = distance.cityblock
+ elif metric == 'dist_norm2':
+ fn_dist = distance.euclidean
+ else:
+ fn_dist = distance.sqeuclidean
+
+ for key1, value1 in data.items():
+ idx.add(key1,value1+value1,key1)
+
+ for key1, value1 in data.items():
+
+ v1 = []
+ v2 = []
+ for dim in value1:
Review comment:
`dim` sounds more like the size of something; I'd go with `x`, sounds
more like a coordinate.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table,
source_table, id_column,
""".format(**locals())
result = plpy.execute(sql)
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric,
n_rows, leaf_id, **kwargs):
+
+ SD = kwargs['SD']
+ if not state:
+ data = {}
+ SD['counter{0}'.format(leaf_id)] = 0
+ else:
+ data = SD['data{0}'.format(leaf_id)]
+
+ data[id_in] = expr_points
+ SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+ SD['data{0}'.format(leaf_id)] = data
+ ret = [[-1,-1],[-1,-1]]
+
+ my_n_rows = n_rows[leaf_id]
+
+ if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+ core_counts = {}
+ core_lists = {}
+ p = index.Property()
+ p.dimension = len(expr_points)
+ idx = index.Index(properties=p)
+ ret = []
+
+ if metric == 'dist_norm1':
+ fn_dist = distance.cityblock
+ elif metric == 'dist_norm2':
+ fn_dist = distance.euclidean
+ else:
+ fn_dist = distance.sqeuclidean
+
+ for key1, value1 in data.items():
+ idx.add(key1,value1+value1,key1)
Review comment:
Maybe move this outside of the `if`? May as well build the rtree row by
row as they come in.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table,
source_table, id_column,
""".format(**locals())
result = plpy.execute(sql)
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric,
n_rows, leaf_id, **kwargs):
+
+ SD = kwargs['SD']
+ if not state:
+ data = {}
+ SD['counter{0}'.format(leaf_id)] = 0
+ else:
+ data = SD['data{0}'.format(leaf_id)]
+
+ data[id_in] = expr_points
+ SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+ SD['data{0}'.format(leaf_id)] = data
+ ret = [[-1,-1],[-1,-1]]
+
+ my_n_rows = n_rows[leaf_id]
+
+ if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+ core_counts = {}
+ core_lists = {}
+ p = index.Property()
+ p.dimension = len(expr_points)
Review comment:
I wish we could call this variable `point` rather than `expr_points`, as
it's an array of numbers representing a single point rather than an expression
or a list of multiple points or expressions. But I guess this is due to the
sql interface to `rtree_step()`, where it does get passed a column of points or
an expression of columns.
Maybe we can change the name to `point` just for the transition function,
and keep it as-is for the aggregate function? Or would that be too confusing
having them named differently? I can't recall if we've done that elsewhere,
but I think so.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table,
source_table, id_column,
""".format(**locals())
result = plpy.execute(sql)
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric,
n_rows, leaf_id, **kwargs):
Review comment:
Some general comments about the `rtree_transition()` function:
This is functionally equivalent to running the dbscan algorithm on one
leaf's worth of points. But it uses a different algorithm in that, it explores
the points in a random order instead of a systematic order (moving from one
neighbor to the next in a breadth-first-search).
It seems like they would have similar runtime behavior in a typical case,
but I'm a bit worried there may be situations where they perform very
differently.
Somewhat related to that: it seems like it would be better to prune the
edges we won't need later now, rather than later, so we have less data to send
back to master. Maybe there's a reason I'm missing why this aggregate function
and the snowflake aggregate function have to be separate, but I think they
ought to be combined if possible. Being able to do lookups in the rtree might
also help the snowflake stage run faster.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -175,9 +383,113 @@ def dbscan(schema_madlib, source_table, output_table,
id_column, expr_point, eps
""".format(**locals())
plpy.execute(sql)
- plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format(
+ plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4},
{5}".format(
distance_table, core_points_table, core_edge_table,
- reachable_points_table))
+ reachable_points_table, sf_step_table, sf_edge_table))
+
+def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps,
+ min_samples, metric, depth):
+
+ n_features = num_features(source_table, expr_point)
+
+ # If squared_dist_norm2 is used, we assume eps is set for the squared
distance
+ # That means the border only needs to be sqrt(eps) wide
+ local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
+
+ kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree(
+ schema_madlib, source_table, expr_point, depth, n_features, local_eps)
+
+ kd_source_table = unique_string(desp='kd_source_table')
+ kd_border_table1 = unique_string(desp='kd_border_table1')
+ kd_border_table2 = unique_string(desp='kd_border_table2')
+
+ dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)'
+ plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table,
kd_border_table1, kd_border_table2))
+
+ output_sql = """
+ CREATE TABLE {kd_source_table} AS
+ SELECT *,
+ CASE {case_when_clause} END AS __leaf_id__
+ FROM {source_table}
+ {dist_leaf_sql}
+ """.format(**locals())
+ plpy.execute(output_sql)
+
+ border_sql = """
+ CREATE TABLE {kd_border_table1} AS
+ SELECT *
+ FROM {source_table}
+ WHERE {border_cl1}
+ """.format(**locals())
+ plpy.execute(border_sql)
+
+ border_sql = """
+ CREATE TABLE {kd_border_table2} AS
+ SELECT *
+ FROM {source_table}
+ WHERE {border_cl2}
+ """.format(**locals())
+ plpy.execute(border_sql)
+
+ return kd_source_table, kd_border_table1, kd_border_table2
+
+
+def build_kd_tree(schema_madlib, source_table, expr_point,
+ depth, n_features, eps, **kwargs):
+ """
+ KD-tree function to create a partitioning for KNN
+ Args:
+ @param schema_madlib Name of the Madlib Schema
+ @param source_table Training data table
+ @param output_table Name of the table to store kd tree
+ @param expr_point Name of the column with training data
+ or expression that evaluates to a
+ numeric array
+ @param depth Depth of the kd tree
+ """
+ with MinWarning("error"):
+
+ n_features = num_features(source_table, expr_point)
+
+ clauses = [' 1=1 ']
+ border_cl1 = ' 1!=1 '
+ border_cl2 = ' 1!=1 '
Review comment:
I have a slight preference for using `True` and `False` instead of `1=1`
and `1!=1`. Either way is fine I guess.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
from utilities.utilities import NUMERIC, ONLY_ARRAY
from utilities.utilities import is_valid_psql_type
from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
from utilities.validate_args import input_tbl_valid, output_tbl_valid
from utilities.validate_args import is_var_valid
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import get_expr_type
from utilities.validate_args import get_algorithm_name
from graph.wcc import wcc
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+ from rtree import index
+ from rtree.index import Rtree
+except ImportError:
+ RTREE_ENABLED=0
+else:
+ RTREE_ENABLED=1
+
BRUTE_FORCE = 'brute_force'
KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+ eps, min_samples, metric, algorithm, depth, **kwargs):
with MinWarning("warning"):
- min_samples = 5 if not min_samples else min_samples
- metric = 'squared_dist_norm2' if not metric else metric
- algorithm = 'brute' if not algorithm else algorithm
+ min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+ metric = DEFAULT_METRIC if not metric else metric
+ algorithm = BRUTE_FORCE if not algorithm else algorithm
+ depth = DEFAULT_KD_DEPTH if not depth else depth
Review comment:
I suggest we should remove depth as a parameter, and instead set:
```
depth = int(log_2 (num_segments))+1
```
ie, the number of leaves should be the next power of 2 above number of
segments.
I can't think of a situation where it would be better to set it to something
else. We want the number of leaves to match the number of segments as closely
as possible, to reduce extra work in merging the leaves together later, and
require fewer rtrees... but it can't be less than the number of segments
otherwise we won't be able to use all the segments in parallel, some will have
to sit idle.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -175,9 +383,113 @@ def dbscan(schema_madlib, source_table, output_table,
id_column, expr_point, eps
""".format(**locals())
plpy.execute(sql)
- plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format(
+ plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4},
{5}".format(
distance_table, core_points_table, core_edge_table,
- reachable_points_table))
+ reachable_points_table, sf_step_table, sf_edge_table))
+
+def dbscan_kd(schema_madlib, source_table, id_column, expr_point, eps,
+ min_samples, metric, depth):
+
+ n_features = num_features(source_table, expr_point)
+
+ # If squared_dist_norm2 is used, we assume eps is set for the squared
distance
+ # That means the border only needs to be sqrt(eps) wide
+ local_eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
+
+ kd_array, case_when_clause, border_cl1, border_cl2 = build_kd_tree(
+ schema_madlib, source_table, expr_point, depth, n_features, local_eps)
+
+ kd_source_table = unique_string(desp='kd_source_table')
+ kd_border_table1 = unique_string(desp='kd_border_table1')
+ kd_border_table2 = unique_string(desp='kd_border_table2')
+
+ dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)'
+ plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(kd_source_table,
kd_border_table1, kd_border_table2))
+
+ output_sql = """
+ CREATE TABLE {kd_source_table} AS
+ SELECT *,
+ CASE {case_when_clause} END AS __leaf_id__
+ FROM {source_table}
+ {dist_leaf_sql}
+ """.format(**locals())
+ plpy.execute(output_sql)
+
+ border_sql = """
+ CREATE TABLE {kd_border_table1} AS
+ SELECT *
+ FROM {source_table}
+ WHERE {border_cl1}
+ """.format(**locals())
+ plpy.execute(border_sql)
+
+ border_sql = """
+ CREATE TABLE {kd_border_table2} AS
+ SELECT *
+ FROM {source_table}
+ WHERE {border_cl2}
+ """.format(**locals())
+ plpy.execute(border_sql)
+
+ return kd_source_table, kd_border_table1, kd_border_table2
+
+
+def build_kd_tree(schema_madlib, source_table, expr_point,
+ depth, n_features, eps, **kwargs):
+ """
+ KD-tree function to create a partitioning for KNN
Review comment:
Update comment about kNN
##########
File path: src/ports/postgres/modules/dbscan/dbscan.sql_in
##########
@@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict(
PythonFunction(dbscan, dbscan, dbscan_predict_help)
$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
+ state1 INTEGER[][],
+ state2 INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
+ state INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
+ state INTEGER[],
+ src BIGINT,
+ dest BIGINT,
+ n_rows INTEGER[],
+ gp_segment_id INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step(
+ BIGINT,
+ BIGINT,
+ INTEGER[],
+ INTEGER);
+CREATE AGGREGATE MADLIB_SCHEMA.sf_step(
+ BIGINT,
+ BIGINT,
+ INTEGER[],
+ INTEGER
+)(
+ STYPE=INTEGER[][],
+ SFUNC=MADLIB_SCHEMA.sf_transition,
+ m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
+ FINALFUNC=MADLIB_SCHEMA.sf_final
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition(
+ state INTEGER[],
+ id_in BIGINT,
+ expr_points DOUBLE PRECISION[],
+ eps DOUBLE PRECISION,
+ min_samples INTEGER,
+ metric VARCHAR,
+ n_rows INTEGER[],
+ leaf_id INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge(
+ state1 INTEGER[][],
+ state2 INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final(
+ state INTEGER[][]
+) RETURNS INTEGER[] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step(
Review comment:
As I understand it, for SGD the aggregate function is named
`madlib_keras_step()` because it's a single step in the long series of steps
taken in the gradient descent process, ie it's called in a for loop many times.
But for the aggregate that builds the rtree, we only call it once--so
including `_step()` in the name seems less appropriate here. (Same goes for
`sf_step()` function.)
Also, `rtree_step` doesn't indicate the purpose of the function, just the
data structure used to implement it. It took me a while reading through the
code to figure out what this function does and how it fits into the rest.
I think naming it something like`find_core_points()` would increase
readability a lot.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
from utilities.utilities import NUMERIC, ONLY_ARRAY
from utilities.utilities import is_valid_psql_type
from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
from utilities.validate_args import input_tbl_valid, output_tbl_valid
from utilities.validate_args import is_var_valid
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import get_expr_type
from utilities.validate_args import get_algorithm_name
from graph.wcc import wcc
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+ from rtree import index
+ from rtree.index import Rtree
+except ImportError:
+ RTREE_ENABLED=0
+else:
+ RTREE_ENABLED=1
+
BRUTE_FORCE = 'brute_force'
KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+ eps, min_samples, metric, algorithm, depth, **kwargs):
with MinWarning("warning"):
- min_samples = 5 if not min_samples else min_samples
- metric = 'squared_dist_norm2' if not metric else metric
- algorithm = 'brute' if not algorithm else algorithm
+ min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+ metric = DEFAULT_METRIC if not metric else metric
+ algorithm = BRUTE_FORCE if not algorithm else algorithm
+ depth = DEFAULT_KD_DEPTH if not depth else depth
algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
[BRUTE_FORCE, KD_TREE], 'DBSCAN')
_validate_dbscan(schema_madlib, source_table, output_table, id_column,
- expr_point, eps, min_samples, metric, algorithm)
+ expr_point, eps, min_samples, metric, algorithm,
depth)
dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY
({0})'.format(id_column)
dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY
(__reachable_id__)'
+ dist_leaf_sql = '' if is_platform_pg() else 'DISTRIBUTED BY
(__leaf_id__)'
- # Calculate pairwise distances
+ core_points_table = unique_string(desp='core_points_table')
+ core_edge_table = unique_string(desp='core_edge_table')
distance_table = unique_string(desp='distance_table')
- plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+ plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+ core_points_table, core_edge_table, distance_table))
+
+ if algorithm == KD_TREE:
+ cur_source_table, border_table1, border_table2 = dbscan_kd(
+ schema_madlib, source_table, id_column, expr_point, eps,
+ min_samples, metric, depth)
+
+ kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+ sql = """
+ SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY
__leaf_id__
+ """.format(**locals())
+ result = plpy.execute(sql)
+ rt_counts_dict = {}
+ for i in result:
+ rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+ rt_counts_list = []
+ for i in sorted(rt_counts_dict):
+ rt_counts_list.append(rt_counts_dict[i])
+
+ leaf_id_start = pow(2,depth)-1
+
+ rtree_step_table = unique_string(desp='rtree_step_table')
+ rt_edge_table = unique_string(desp='rt_edge_table')
+ rt_core_points_table = unique_string(desp='rt_core_points_table')
+ border_core_points_table =
unique_string(desp='border_core_points_table')
+ border_edge_table = unique_string(desp='border_edge_table')
+ plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
+ rtree_step_table, rt_edge_table, rt_core_points_table,
+ border_core_points_table, border_edge_table))
+
+ sql = """
+ CREATE TABLE {rtree_step_table} AS
+ SELECT __leaf_id__,
+ {schema_madlib}.rtree_step( {id_column},
+ {expr_point},
+ {eps},
+ {min_samples},
+ '{metric}',
+ ARRAY{rt_counts_list},
+ __leaf_id__
+ )
+ FROM {cur_source_table} GROUP BY __leaf_id__
Review comment:
I recommend we GROUP BY `gp_segment_id` instead of `__leaf_id__` here.
We can add all the points in each segment to the same rtree, that will mean
fewer rtrees and less to merge later. In some cases, it may not help, but if
the regions within a single segment happen to be near each other or
overlapping, it could help a lot... especially if we let the user control the
depth (as they might set it so there are many leaves in each segment.
We might even be able to remove the GROUP BY entirely, but that would
require changing the merge function a bit.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.sql_in
##########
@@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict(
PythonFunction(dbscan, dbscan, dbscan_predict_help)
$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
+ state1 INTEGER[][],
+ state2 INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
+ state INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
+ state INTEGER[],
+ src BIGINT,
+ dest BIGINT,
+ n_rows INTEGER[],
+ gp_segment_id INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step(
+ BIGINT,
+ BIGINT,
+ INTEGER[],
+ INTEGER);
+CREATE AGGREGATE MADLIB_SCHEMA.sf_step(
+ BIGINT,
+ BIGINT,
+ INTEGER[],
+ INTEGER
+)(
+ STYPE=INTEGER[][],
+ SFUNC=MADLIB_SCHEMA.sf_transition,
+ m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
+ FINALFUNC=MADLIB_SCHEMA.sf_final
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition(
+ state INTEGER[],
+ id_in BIGINT,
+ expr_points DOUBLE PRECISION[],
+ eps DOUBLE PRECISION,
+ min_samples INTEGER,
+ metric VARCHAR,
+ n_rows INTEGER[],
+ leaf_id INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge(
+ state1 INTEGER[][],
+ state2 INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final(
+ state INTEGER[][]
+) RETURNS INTEGER[] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step(
Review comment:
As I understand it, for SGD the aggregate function is named
`madlib_keras_step()` because it's a single step in the long series of steps
taken in the gradient descent process, ie it's called in a for loop many times.
But for the aggregate that builds the rtree, we only call it once--so
including `_step()` in the name seems less appropriate here. (Same goes for
`sf_step()` function.)
Also, `rtree_step` doesn't indicate the purpose of the function, just the
data structure used to implement it. It took me a while reading through the
code to figure out what this function does and how it fits into the rest.
I think naming it something like`find_core_points()`or `build_edge_table`
would increase readability a lot.
##########
File path: src/ports/postgres/modules/dbscan/dbscan.sql_in
##########
@@ -465,3 +480,119 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.dbscan_predict(
PythonFunction(dbscan, dbscan, dbscan_predict_help)
$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_merge(
+ state1 INTEGER[][],
+ state2 INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_final(
+ state INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.sf_transition(
+ state INTEGER[],
+ src BIGINT,
+ dest BIGINT,
+ n_rows INTEGER[],
+ gp_segment_id INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.sf_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.sf_step(
+ BIGINT,
+ BIGINT,
+ INTEGER[],
+ INTEGER);
+CREATE AGGREGATE MADLIB_SCHEMA.sf_step(
+ BIGINT,
+ BIGINT,
+ INTEGER[],
+ INTEGER
+)(
+ STYPE=INTEGER[][],
+ SFUNC=MADLIB_SCHEMA.sf_transition,
+ m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.sf_merge,')
+ FINALFUNC=MADLIB_SCHEMA.sf_final
+);
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_transition(
+ state INTEGER[],
+ id_in BIGINT,
+ expr_points DOUBLE PRECISION[],
+ eps DOUBLE PRECISION,
+ min_samples INTEGER,
+ metric VARCHAR,
+ n_rows INTEGER[],
+ leaf_id INTEGER
+
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_transition(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_merge(
+ state1 INTEGER[][],
+ state2 INTEGER[][]
+) RETURNS INTEGER[][] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.rtree_final(
+ state INTEGER[][]
+) RETURNS INTEGER[] AS $$
+PythonFunctionBodyOnlyNoSchema(`dbscan', `dbscan')
+ return dbscan.rtree_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+--id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id,
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.rtree_step(
Review comment:
As I understand it, for SGD the aggregate function is named
`madlib_keras_step()` because it's a single step in the long series of steps
taken in the gradient descent process, ie it's called in a for loop many times.
But for the aggregate that builds the rtree, we only call it once--so
including `_step()` in the name seems less appropriate here. (Same goes for
`sf_step()` function.)
Also, `rtree_step` doesn't indicate the purpose of the function, just the
data structure used to implement it. It took me a while reading through the
code to figure out what this function does and how it fits into the rest.
I think naming it something like`find_core_points()`or `build_edge_table()`
would increase readability a lot.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]