Repository: incubator-madlib Updated Branches: refs/heads/master a19a9d6ce -> 3f599c943
Feature: Weakly connected components helper functions JIRA: MADLIB-1101 Add several helper functions that will quickly return back various useful stats based on the connected components learng from the madlib.weakly_connected_components() function. Five helper functions are added as part of this story, along with docs and updated install check. The helper functions are: - graph_wcc_largest_cpt(): finds largest components - graph_wcc_histogram(): finds number of vertices in each component - graph_wcc_vertex_check(): finds all components that have a given pair of vertices in them. - graph_wcc_num_cpts(): finds total number of components. - graph_wcc_reachable_vertices(): finds all vertices reachable within a component for a given source vertex. All these functions are implemented to handle grouping columns too if the WCC's output table was created with grouping_cols. Closes #155 Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/3f599c94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/3f599c94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/3f599c94 Branch: refs/heads/master Commit: 3f599c94306cade62dd86ca588217c6c5f65590e Parents: a19a9d6 Author: Nandish Jayaram <njaya...@apache.org> Authored: Tue Jul 18 09:31:09 2017 -0700 Committer: Nandish Jayaram <njaya...@apache.org> Committed: Fri Jul 28 13:34:14 2017 -0700 ---------------------------------------------------------------------- .../postgres/modules/graph/graph_utils.py_in | 42 +- .../postgres/modules/graph/test/wcc.sql_in | 48 +- src/ports/postgres/modules/graph/wcc.py_in | 453 ++++++++++++++++++- src/ports/postgres/modules/graph/wcc.sql_in | 345 +++++++++++++- 4 files changed, 848 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3f599c94/src/ports/postgres/modules/graph/graph_utils.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in index 9d31345..c9f6b73 100644 --- a/src/ports/postgres/modules/graph/graph_utils.py_in +++ b/src/ports/postgres/modules/graph/graph_utils.py_in @@ -27,7 +27,7 @@ @namespace graph """ -from utilities.utilities import _assert +from utilities.utilities import _assert, add_postfix from utilities.validate_args import get_cols from utilities.validate_args import unquote_ident from utilities.validate_args import table_exists @@ -36,7 +36,6 @@ from utilities.validate_args import table_is_empty def _grp_null_checks(grp_list): - """ Helper function for generating NULL checks for grouping columns to be used within a WHERE clause @@ -44,7 +43,7 @@ def _grp_null_checks(grp_list): @param grp_list The list of grouping columns """ return ' AND '.join([" {i} IS NOT NULL ".format(**locals()) - for i in grp_list]) + for i in grp_list]) def _check_groups(tbl1, tbl2, grp_list): @@ -71,6 +70,43 @@ def _grp_from_table(tbl, grp_list): for i in grp_list]) +def validate_output_and_summary_tables(model_out_table, module_name, + out_table=None): + """ + Validate a output table, and the associated summary table. The + assumption here is that, given a model_out_table, there is also a summary + table named model_out_table+"_summary" created. This function checks for + the availability of both these tables. + Optionally, the absence of an 'out_table' can also be checked for, which + is the table that is to be created. + Args: + @param model_out_table + @param module_name + @param out_table (optional) + + Results: + Throws an error if either model_out_table or model_out_table_"_summary" + is not present. It also throws an error out_table (if specified) + is already present. + """ + _assert(model_out_table and model_out_table.strip().lower() not in ('null', ''), + "Graph {0}: Invalid {0} table name.".format(module_name)) + _assert(table_exists(model_out_table), + "Graph {0}: {0} table ({1}) is missing.".format(module_name, model_out_table)) + _assert(not table_is_empty(model_out_table), + "Graph {0}: {0} table ({1}) is empty.".format(module_name, model_out_table)) + + summary = add_postfix(model_out_table, "_summary") + _assert(table_exists(summary), + "Graph {0}: {0} summary table ({1}) is missing.".format(module_name, summary)) + _assert(not table_is_empty(summary), + "Graph {0}: {0} summary table ({1}) is empty.".format(module_name, summary)) + + if out_table: + _assert(not table_exists(out_table), + "Graph WCC: Output table {0} already exists.".format(out_table)) + + def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, out_table, func_name, **kwargs): """ http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3f599c94/src/ports/postgres/modules/graph/test/wcc.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/test/wcc.sql_in b/src/ports/postgres/modules/graph/test/wcc.sql_in index 3751eb0..a943f6e 100644 --- a/src/ports/postgres/modules/graph/test/wcc.sql_in +++ b/src/ports/postgres/modules/graph/test/wcc.sql_in @@ -63,7 +63,7 @@ INSERT INTO edge VALUES (15, 16, 1), (15, 14, 1); -DROP TABLE IF EXISTS wcc_out; +DROP TABLE IF EXISTS wcc_out, wcc_out_summary; SELECT weakly_connected_components( 'vertex', 'vertex_id', @@ -95,7 +95,7 @@ INSERT INTO edge VALUES (15, 16, 2), (15, 14, 2); -DROP TABLE IF EXISTS wcc_out; +DROP TABLE IF EXISTS wcc_out, wcc_out_summary; SELECT weakly_connected_components( 'vertex', 'vertex_id', @@ -114,3 +114,47 @@ SELECT assert(relative_error(count(distinct component_id), 3) < 0.00001, SELECT assert(relative_error(count(distinct component_id), 3) < 0.00001, 'Weakly Connected Components: Number of components found is not 4.' ) FROM wcc_out WHERE user_id=1; + +-- Test WCC helper functions: +DROP TABLE IF EXISTS largest_cpt_table; +SELECT madlib.graph_wcc_largest_cpt( + 'wcc_out', -- WCC's output table + 'largest_cpt_table'); -- output table +SELECT assert(relative_error(num_vertices, 6) < 0.00001, + 'Weakly Connected Components: Incorrect largest component value.' + ) FROM largest_cpt_table WHERE user_id=2; + +DROP TABLE IF EXISTS histogram_table; +SELECT madlib.graph_wcc_histogram( + 'wcc_out', -- WCC's output table + 'histogram_table'); -- output table +SELECT assert(relative_error(num_vertices, 4) < 0.00001, + 'Weakly Connected Components: Incorrect histogram value.' + ) FROM histogram_table WHERE user_id=1 and component_id=10; + +DROP TABLE IF EXISTS vc_table; +SELECT madlib.graph_wcc_vertex_check( + 'wcc_out', -- WCC's output table + '14,15', -- Pair of vertex IDs + 'vc_table'); -- output table +SELECT assert(relative_error(component_id, 14) < 0.00001, + 'Weakly Connected Components: Incorrect vertex check value.' + ) FROM vc_table WHERE user_id=1; + +DROP TABLE IF EXISTS reach_table; +SELECT madlib.graph_wcc_reachable_vertices( + 'wcc_out', -- WCC's output table + '0', -- source vertex + 'reach_table'); -- output table +SELECT assert(relative_error(count(dest), 5) < 0.00001, + 'Weakly Connected Components: Incorrect reachable vertices value.' + ) FROM reach_table WHERE user_id=2 and component_id=0; + +DROP TABLE IF EXISTS count_table; +SELECT madlib.graph_wcc_num_cpts( + 'wcc_out', -- WCC's output table + 'count_table'); -- output table +SELECT assert(relative_error(num_components, 3) < 0.00001, + 'Weakly Connected Components: Incorrect largest component value.' + ) FROM count_table WHERE user_id=1; + http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3f599c94/src/ports/postgres/modules/graph/wcc.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in index 7027b29..bf905c8 100644 --- a/src/ports/postgres/modules/graph/wcc.py_in +++ b/src/ports/postgres/modules/graph/wcc.py_in @@ -31,18 +31,25 @@ import plpy from utilities.utilities import _assert from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string, split_quoted_delimited_str -from utilities.validate_args import columns_exist_in_table +from utilities.validate_args import columns_exist_in_table, get_expr_type from utilities.utilities import is_platform_pg, is_platform_hawq +from utilities.utilities import add_postfix +from utilities.validate_args import table_exists +from utilities.control import MinWarning from graph_utils import validate_graph_coding, get_graph_usage +from graph_utils import validate_output_and_summary_tables def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, - edge_params, out_table, grouping_cols_list, module_name): + edge_params, out_table, out_table_summary, + grouping_cols_list, module_name): """ Function to validate input parameters for wcc """ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, out_table, module_name) + _assert(not table_exists(out_table_summary), + "Graph {module_name}: Output summary table already exists!".format(**locals())) if grouping_cols_list: # validate the grouping columns. We currently only support grouping_cols # to be column names in the edge_table, and not expressions! @@ -57,7 +64,7 @@ def prefix_tablename_to_colnames(table, cols_list): def get_where_condition(table1, table2, cols_list): return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col) - for col in cols_list]) + for col in cols_list]) def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, @@ -81,18 +88,24 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, plpy.execute('SET client_min_messages TO warning') params_types = {'src': str, 'dest': str} default_args = {'src': 'src', 'dest': 'dest'} - edge_params = extract_keyvalue_params(edge_args, params_types, default_args) + edge_params = extract_keyvalue_params( + edge_args, params_types, default_args) - # populate default values for optional params if null - if vertex_id is None: + # populate default values for optional params if null, and prepare data + # to be written into the summary table (*_st variable names) + if not vertex_id: vertex_id = "id" + v_st = "id" + else: + v_st = vertex_id if not grouping_cols: grouping_cols = '' + out_table_summary = add_postfix(out_table, "_summary") grouping_cols_list = split_quoted_delimited_str(grouping_cols) validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, - edge_params, out_table, grouping_cols_list, - 'Weakly Connected Components') + edge_params, out_table, out_table_summary, + grouping_cols_list, 'Weakly Connected Components') src = edge_params["src"] dest = edge_params["dest"] @@ -102,7 +115,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, toupdate = unique_string(desp='toupdate') temp_out_table = unique_string(desp='tempout') - distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id) + distribution = '' if is_platform_pg() else \ + "DISTRIBUTED BY ({0})".format(vertex_id) subq_prefixed_grouping_cols = '' comma_toupdate_prefixed_grouping_cols = '' comma_oldupdate_prefixed_grouping_cols = '' @@ -118,7 +132,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, if grouping_cols: distribution = ('' if is_platform_pg() else - "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, vertex_id)) + "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, + vertex_id)) # Update some variables useful for grouping based query strings subq = unique_string(desp='subquery') distinct_grp_table = unique_string(desp='grptable') @@ -130,16 +145,21 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, pttc = prefix_tablename_to_colnames gwc = get_where_condition - comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, grouping_cols_list) - comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, grouping_cols_list) + comma_toupdate_prefixed_grouping_cols = ', ' + \ + pttc(toupdate, grouping_cols_list) + comma_oldupdate_prefixed_grouping_cols = ', ' + \ + pttc(oldupdate, grouping_cols_list) subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list) - old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, grouping_cols_list) - new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, grouping_cols_list) - edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, grouping_cols_list) + old_new_update_where_condition = ' AND ' + \ + gwc(oldupdate, newupdate, grouping_cols_list) + new_to_update_where_condition = ' AND ' + \ + gwc(newupdate, toupdate, grouping_cols_list) + edge_to_update_where_condition = ' AND ' + \ + gwc(edge_table, toupdate, grouping_cols_list) join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list) group_by_clause_newupdate = ('' if not grouping_cols else - '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols, - subq, vertex_id)) + '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols, + subq, vertex_id)) plpy.execute(""" CREATE TABLE {newupdate} AS SELECT {subq}.{vertex_id}, @@ -206,7 +226,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, FROM {message} GROUP BY {group_by_clause} {vertex_id} {distribution} - """.format(grouping_cols_select='' if not grouping_cols else ', {0}'.format(grouping_cols), + """.format(grouping_cols_select='' if not grouping_cols else + ', {0}'.format(grouping_cols), group_by_clause=grouping_cols_comma, **locals())) @@ -298,12 +319,319 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, """.format(**locals()))[0]["cnt"] plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table)) + # Create summary table. We only need the vertex_id and grouping columns + # in it. + plpy.execute(""" + CREATE TABLE {out_table_summary} ( + {grouping_cols_summary} + vertex_table TEXT, + vertex_id TEXT, + vertex_id_type TEXT + ) + """.format(grouping_cols_summary='' if not grouping_cols else + 'grouping_cols TEXT, ', **locals())) + vertex_id_type = get_expr_type(vertex_id, vertex_table) + plpy.execute(""" + INSERT INTO {out_table_summary} VALUES + ({grouping_cols_summary} '{vertex_table}', '{vertex_id}', + '{vertex_id_type}') + """.format(grouping_cols_summary='' if not grouping_cols else + "'{0}', ".format(grouping_cols), **locals())) plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3} """.format(message, oldupdate, newupdate, toupdate)) if is_hawq: plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table)) +# WCC Helper functions: +def extract_wcc_summary_cols(wcc_summary_table): + """ + WCC helper function to find all values stored in the summary table. + Args: + @param wcc_summary_table + + Returns: + Dictionary, containing the column names and their values. The + keys in the dictionary are 'vertex_id', 'vertex_id_type' and + 'grouoping_cols' if grouping cols exist. + """ + return plpy.execute("SELECT * FROM {wcc_summary_table} ".format( + **locals()))[0] + + +def preprocess_wcc_table_args(wcc_table, out_table): + """ + Validate wcc_table, wcc_table_summary and the output tables. Read + the summary table and return a dictionary of the summary table. + """ + validate_output_and_summary_tables(wcc_table, "WCC", out_table) + wcc_summary_table = add_postfix(wcc_table, "_summary") + return extract_wcc_summary_cols(wcc_summary_table) + +def check_input_vertex_validity(wcc_args, vertices): + """ + Function to check if vertices are all valid, i.e., are present + in the WCC's original input vertex table. Even if one of the input + vertices (when more than one) is not valid, return False + Args: + @param wcc_args (dict) + @param vertices (list) + Returns: + True if all vertices in the list are present in the original input + vertex table, False otherwise. + """ + vertex_table = wcc_args['vertex_table'] + _assert(table_exists(vertex_table), + "Graph WCC: Input vertex table '{0}' does not exist.".format( + vertex_table)) + vertex_col = wcc_args['vertex_id'] + where_clause = ' OR '.join(["{0}='{1}'".format(vertex_col, v) + for v in vertices]) + count = plpy.execute(""" + SELECT COUNT(*) as count FROM ( + SELECT 1 FROM {vertex_table} + WHERE {where_clause} + ) t + """.format(**locals()))[0]['count'] + _assert(count == len(vertices), + "Graph WCC: Invalid input vertex in {0}.".format(str(vertices))) + +def create_component_cnts_table(wcc_table, cnts_out_table, + grouping_cols_comma): + """ + WCC helper function to create a table containing the number of vertices + per component. + + Args: + @param wcc_table + @param cnts_out_table + @param grouping_cols_comma + + Returns: + Creates a new table called cnts_out_table with necessary content. + """ + plpy.execute(""" + CREATE TABLE {cnts_out_table} AS + SELECT {grouping_cols_select} component_id, COUNT(*) as num_vertices + FROM {wcc_table} + GROUP BY {group_by_clause} component_id + """.format(grouping_cols_select=grouping_cols_comma, + group_by_clause=grouping_cols_comma, **locals())) + + +def graph_wcc_largest_cpt(schema_madlib, wcc_table, largest_cpt_table, + **kwargs): + """ + WCC helper function that computes the largest weakly connected component + in each group (if grouping cols are defined) + + Args: + @param wcc_table + @param largest_cpt_table + + Returns: + Creates table largest_cpt_table that contains a column called + component_id that refers to the largest component. If grouping_cols + are defined, columns corresponding to the grouping_cols are also + created, and the largest component is computed with regard to a group. + """ + with MinWarning("warning"): + wcc_args = preprocess_wcc_table_args(wcc_table, largest_cpt_table) + # Create temp table containing the number of vertices in each + # component. + tmp_cnt_table = unique_string(desp='tmpcnt') + if 'grouping_cols' in wcc_args: + grouping_cols = wcc_args['grouping_cols'] + else: + grouping_cols = '' + glist = split_quoted_delimited_str(grouping_cols) + grouping_cols_comma = '' if not grouping_cols else grouping_cols + ',' + + subq = unique_string(desp='q') + subt = unique_string(desp='t') + create_component_cnts_table(wcc_table, tmp_cnt_table, + grouping_cols_comma) + # Query to find ALL largest components within groups. + select_grouping_cols_subq = '' + groupby_clause_subt = '' + grouping_cols_join = '' + if grouping_cols: + select_grouping_cols_subq = ', '.join(['{0}.{1}'.format(subq, gcol) + for gcol in glist]) + ', ' + groupby_clause_subt = ' GROUP BY {0}'.format(grouping_cols) + grouping_cols_join = ' AND ' + ', '.join(['{0}.{2}={1}.{2}'.format( + subq, subt, gcol) for gcol in glist]) + plpy.execute(""" + CREATE TABLE {largest_cpt_table} AS + SELECT {select_grouping_cols_subq} {subq}.component_id, + {subt}.maxcnt AS num_vertices + FROM {tmp_cnt_table} AS {subq} + INNER JOIN ( + SELECT {grouping_cols_select_subt} + MAX(num_vertices) AS maxcnt + FROM {tmp_cnt_table} + {groupby_clause_subt} + ) {subt} + ON {subq}.num_vertices={subt}.maxcnt + {grouping_cols_join} + """.format(grouping_cols_select_subt=grouping_cols_comma, + **locals())) + # Drop temp table + plpy.execute("DROP TABLE IF EXISTS {0}".format(tmp_cnt_table)) + + +def graph_wcc_histogram(schema_madlib, wcc_table, histogram_table, **kwargs): + """ + Retrieve Histogram of Vertices Per Connected Component + + Args: + @param wcc_table + @param histogram_table + + Returns: + Creates and populates histogram_table with number of vertices per + component (represented by column num_vertices). Columns corresponding + to grouping_cols are also created if defined. + """ + with MinWarning("warning"): + wcc_args = preprocess_wcc_table_args(wcc_table, histogram_table) + grouping_cols_comma = '' + if 'grouping_cols' in wcc_args: + grouping_cols_comma = wcc_args['grouping_cols'] + ', ' + create_component_cnts_table(wcc_table, histogram_table, + grouping_cols_comma) + + +def graph_wcc_vertex_check(schema_madlib, wcc_table, vertex_pair, pair_table, + **kwargs): + """ + WCC helper function to check if two vertices belong to the same component. + + Args: + @param wcc_table + @param vertex_pair + @param pair_table + + Returns: + Creates and populates pair_table with all the components that have + both the vertices specified in the vertex_pair attribute. There are + columns for grouping, if specified. + """ + with MinWarning("warning"): + wcc_args = preprocess_wcc_table_args(wcc_table, pair_table) + vertices = split_quoted_delimited_str(vertex_pair) + _assert(vertices and len(vertices) == 2, + "Graph WCC: Invalid vertex pair ({0}) input.".format( + vertex_pair)) + check_input_vertex_validity(wcc_args, vertices) + grouping_cols_comma = '' + if 'grouping_cols' in wcc_args: + grouping_cols_comma = wcc_args['grouping_cols'] + ', ' + subq = unique_string(desp='subq') + inner_select_clause = " SELECT {0} component_id ".format( + grouping_cols_comma) + inner_from_clause = " FROM {0} ".format(wcc_table) + inner_groupby_clause = " GROUP BY {0} component_id".format( + grouping_cols_comma) + plpy.execute(""" + CREATE TABLE {pair_table} AS + SELECT {grouping_cols_comma} component_id + FROM ( + {inner_select_clause}, 1 + {inner_from_clause} + WHERE {vertex_id}='{vertex1}' + {inner_groupby_clause} + UNION ALL + {inner_select_clause}, 2 + {inner_from_clause} + WHERE {vertex_id}='{vertex2}' + {inner_groupby_clause} + ) {subq} + GROUP BY {grouping_cols_comma} component_id + HAVING COUNT(*)=2 + """.format(vertex_id=wcc_args['vertex_id'], + vertex1=vertices[0], vertex2=vertices[1], **locals())) + + +def graph_wcc_reachable_vertices(schema_madlib, wcc_table, src, + reachable_vertices_table, **kwargs): + """ + WCC helper function to retrieve all vertices reachable from a vertex + + Args: + @param wcc_table + @param src + @param reachable_vertices_table + + Results: + Creates and populates reachable_vertices_table table with all the + vertices reachable from src vertex, where reachability is with + regard to a component. There are columns for grouping, if specified. + """ + with MinWarning("warning"): + wcc_args = preprocess_wcc_table_args(wcc_table, + reachable_vertices_table) + check_input_vertex_validity(wcc_args, split_quoted_delimited_str(src)) + grouping_cols_comma = '' + grouping_cols = '' + if 'grouping_cols' in wcc_args: + grouping_cols = wcc_args['grouping_cols'] + grouping_cols_comma = grouping_cols + ', ' + vertex_id = wcc_args['vertex_id'] + subq = unique_string(desp='subq') + glist = split_quoted_delimited_str(grouping_cols) + grouping_cols_join = '' if not grouping_cols else ' AND ' + \ + ', '.join(['{0}.{2}={1}.{2}'.format(wcc_table, subq, gcol) + for gcol in glist]) + subq_grouping_cols = '' if not grouping_cols else ', '.join( + ['{0}.{1}'.format(subq, gcol) for gcol in glist]) + ', ' + plpy.execute(""" + CREATE TABLE {reachable_vertices_table} AS + SELECT {subq_grouping_cols} {subq}.component_id, + {wcc_table}.{vertex_id} AS dest + FROM {wcc_table} + INNER JOIN ( + SELECT {grouping_cols_comma} component_id, {vertex_id} + FROM {wcc_table} + GROUP BY {vertex_id}, {grouping_cols_comma} component_id + HAVING {vertex_id}='{src}' + ) {subq} + ON {wcc_table}.component_id={subq}.component_id + {grouping_cols_join} + WHERE {wcc_table}.{vertex_id} != '{src}' + """.format(**locals())) + + +def graph_wcc_num_cpts(schema_madlib, wcc_table, count_table, **kwargs): + """ + WCC helper function to count the number of connected components + + Args: + @param: wcc_table + @param: count_table + + Results: + Creates and populates the count_table table with the total number + of components. If grouping_cols is involved, number of components + are computed with regard to a group. + """ + with MinWarning("warning"): + wcc_args = preprocess_wcc_table_args(wcc_table, count_table) + grouping_cols = '' + grouping_cols_comma = '' + if 'grouping_cols' in wcc_args: + grouping_cols = wcc_args['grouping_cols'] + grouping_cols_comma = grouping_cols + ', ' + plpy.execute(""" + CREATE TABLE {count_table} AS + SELECT {grouping_cols_comma} + COUNT(DISTINCT component_id) AS num_components + FROM {wcc_table} + {grp_by_clause} + """.format(grp_by_clause='' if not grouping_cols else + ' GROUP BY {0}'.format(grouping_cols), **locals())) + + def wcc_help(schema_madlib, message, **kwargs): """ Help function for wcc @@ -322,10 +650,51 @@ def wcc_help(schema_madlib, message, **kwargs): help_string = get_graph_usage( schema_madlib, 'Weakly Connected Components', - """out_table TEXT, -- Output table of weakly connected components - grouping_col TEXT -- Comma separated column names to group on - -- (DEFAULT = NULL, no grouping) - """) + """out_table TEXT, -- Output table of weakly connected components + grouping_col TEXT -- Comma separated column names to group on + -- (DEFAULT = NULL, no grouping) + """) + """ + + Once the above function is used to obtain the out_table, it can be used to + call several other helper functions based on weakly connected components: + + (1) To retrieve the largest connected component: + SELECT {schema_madlib}.graph_wcc_largest_cpt( + wcc_table TEXT, -- Name of the table that contains the WCC output. + largest_cpt_table TEXT -- Name of the output table that contains the + -- largest components details. + ); + + (2) To retrieve the histogram of vertices per connected component: + SELECT {schema_madlib}.graph_wcc_histogram( + wcc_table TEXT, -- Name of the table that contains the WCC output. + histogram_table TEXT -- Name of the output table that contains the + -- histogram of vertices per connected component. + ); + + (3) To check if two vertices belong to the same component: + SELECT {schema_madlib}.graph_wcc_vertex_check( + wcc_table TEXT, -- Name of the table that contains the WCC output. + vertex_pair TEXT, -- Pair of vertex IDs, separated by a comma. + pair_table TEXT -- Name of the output table that contains the all + -- components that contain the two vertices. + ); + + (4) To retrieve all vertices reachable from a vertex: + SELECT {schema_madlib}.graph_wcc_reachable_vertices( + wcc_table TEXT, -- Name of the table that contains the WCC output. + src TEXT, -- Initial source vertex. + reachable_vertices_table TEXT -- Name of the output table that + -- contains all vertices in a + -- component reachable from src. + ); + + (5) To count the number of connected components: + SELECT {schema_madlib}.graph_wcc_num_cpts( + wcc_table TEXT, -- Name of the table that contains the WCC output. + count_table TEXT -- Name of the output table that contains the count + -- of number of components. + );""" else: if message is not None and \ message.lower() in ("example", "examples"): @@ -404,6 +773,46 @@ SELECT madlib.weakly_connected_components( -- View the component ID associated with each vertex within the sub-graph -- associated with each user: SELECT * FROM wcc_out ORDER BY user_id, component_id; + +-- Retrieve the largest connected component +DROP TABLE IF EXISTS largest_cpt_table; +SELECT madlib.graph_wcc_largest_cpt( + 'wcc_out', -- WCC's output table + 'largest_cpt_table'); -- output table with largest component IDs +DROP TABLE largest_cpt_table; + +-- There are several helper functions to use after wcc_out is obtained: +-- Retrieve Histogram of Vertices Per Connected Component +DROP TABLE IF EXISTS histogram_table; +SELECT madlib.graph_wcc_histogram( + 'wcc_out', -- WCC's output table + 'histogram_table'); -- output table containing the histogram of vertices +DROP TABLE histogram_table; + +-- Check if Two Vertices Belong to the Same Component +DROP TABLE IF EXISTS vc_table; +SELECT madlib.graph_wcc_vertex_check( + 'wcc_out', -- WCC's output table + '14,15', -- Pair of vertex IDs + 'vc_table'); -- output table containing components that contain the + -- two vertices +DROP TABLE vc_table; + +-- Retrieve All Vertices Reachable from a Vertex +DROP TABLE IF EXISTS reach_table; +SELECT madlib.graph_wcc_reachable_vertices( + 'wcc_out', -- WCC's output table + '0', -- source vertex + 'reach_table'); -- output table containing all vertices reachable from + -- source vertex +DROP TABLE reach_table; + +-- Count of Connected Components +DROP TABLE IF EXISTS count_table; +SELECT madlib.graph_wcc_num_cpts( + 'wcc_out', -- WCC's output table + 'count_table'); -- output table containing number of components per group +DROP TABLE count_table; """ else: help_string = """ http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3f599c94/src/ports/postgres/modules/graph/wcc.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.sql_in b/src/ports/postgres/modules/graph/wcc.sql_in index a02db55..d47f06c 100644 --- a/src/ports/postgres/modules/graph/wcc.sql_in +++ b/src/ports/postgres/modules/graph/wcc.sql_in @@ -35,16 +35,22 @@ m4_include(`SQLCommon.m4') <div class="toc"><b>Contents</b> <ul> <li><a href="#wcc">Weakly Connected Components</a></li> +<li><a href="#rlcc">Retrieve Largest Connected Component</a></li> +<li><a href="#hist">Build Histogram</a></li> +<li><a href="#samecpt">Check Vertices in Same Connected Component</a></li> +<li><a href="#reach">Retrieve Reachable Vertices</a></li> +<li><a href="#count">Count Connected Components</a></li> <li><a href="#examples">Examples</a></li> </ul> </div> @brief Find all weakly connected components of a graph. -Given a directed graph, a weakly connected component is a subgraph of the original -graph where all vertices are connected to each other by some path, ignoring the -direction of edges. In case of an undirected graph, a weakly connected component is -also a strongly connected component. +Given a directed graph, a weakly connected component (WCC) is a subgraph of +the original graph where all vertices are connected to each other by some path, +ignoring the direction of edges. In case of an undirected graph, a weakly +connected component is also a strongly connected component. This module also +includes a number of helper functions that operate on the WCC output. @anchor wcc @par Weakly Connected Components @@ -91,19 +97,190 @@ the following columns: We use the convention where 'component_id' is the id of the first vertex in a particular group. It means that component ids are generally not contiguous. - - grouping_cols : Grouping column (if any) values associated with the vertex_id.</dd> + - grouping_cols : Grouping column (if any) values associated with the vertex_id. + +A summary table named <out_table>_summary is also created. This is an internal +table that keeps a record of some of the input parameters and is used by the +weakly connected component helper functions. +</dd> <dt>grouping_cols (optional)</dt> <dd>TEXT, default: NULL. A single column or a list of comma-separated -columns that divides the input data into discrete groups, which are +columns that divides the input data into discrete groups, which are treated independently as separate graphs. When this value is NULL, no grouping is used and -weakly connected components are generated for all data +weakly connected components are generated for all data (single graph). @note Expressions are not currently supported for 'grouping_cols'.</dd> </dl> +@anchor rlcc +@par Retrieve Largest Connected Component + +The largest connected component retrieval function finds the largest weakly +connected component(s) in a graph. If weakly connected components was run with +grouping, the largest connected components are computed for each group. + +<pre class="syntax"> +graph_wcc_largest_cpt( wcc_table, + largest_cpt_table + ) +</pre> + +\b Arguments +<dl class="arglist"> +<dt>wcc_table</dt> +<dd>TEXT. Name of the table that contains the output of weakly connected +components.</dd> + +<dt>largest_cpt_table</dt> +<dd>TEXT. Name of the output table that contains the largest component's +information. It contains one or more rows for every group and has the following +columns: + - grouping_cols: The grouping columns given in the creation of wcc_table. + If there are no grouping columns, this column is not created. + - component_id: The ID of the largest component. Recall that we use the + convention where 'component_id' is the id of the first vertex in a + particular group. It means that component ids are generally not contiguous. + If there are multiple components of the same size, a row is created for each + component. If grouping_cols is specified, the largest + component is computed for each group. + - num_vertices: Number of vertices in the largest component. +</dd> +</dl> + +@anchor hist +@par Retrieve Histogram of Vertices Per Connected Component + +This function creates a histogram of the number of vertices +per connected component. + +<pre class="syntax"> +graph_wcc_histogram( wcc_table, + histogram_table + ) +</pre> + +\b Arguments +<dl class="arglist"> +<dt>wcc_table</dt> +<dd>TEXT. Name of the table that contains the output of weakly connected +components.</dd> + +<dt>histogram_table</dt> +<dd>TEXT. Name of the output table that contains the number of vertices per +component. A row is created for every comoponent in every group +if grouping_cols was specified when running weakly connected components. +The output table has the following columns: + - grouping_cols: The grouping columns given during the creation of the +wcc_table. If there are no grouping columns, this column +is not created. + - component_id: The ID of the component. + - num_vertices: Number of vertices in the component specified by the +component_id column. + +</dd> +</dl> + +@anchor samecpt +@par Check if Two Vertices Belong to the Same Component + +This function determines if two vertices belong to the same component. + +<pre class="syntax"> +graph_wcc_vertex_check( wcc_table, + vertex_pair, + pair_table + ) +</pre> + +\b Arguments +<dl class="arglist"> +<dt>wcc_table</dt> +<dd>TEXT. Name of the table that contains the output of weakly connected +components.</dd> + +<dt>vertex_pair</dt> +<dd>TEXT. A pair of vertex IDs separated by a comma.</dd> + +<dt>pair_table</dt> +<dd>TEXT. Name of the output table that specifies if the two vertices in +vertex_pair belong to the same component. If wcc_table was generated using +grouping_cols, all the components in all groups are considered. The output +table has the following columns: + - component_id: Component ID that contains both the vertices in vertex_pair. + - grouping_cols: The grouping columns given in the creation of wcc_table. If + there are no grouping columns, this column is not created. + +</dd> +</dl> + +@anchor reach +@par Retrieve All Vertices Reachable from a Vertex + +This function finds all the vertices that can be reached from a given vertex +via weakly connected paths. + +<pre class="syntax"> +graph_wcc_reachable_vertices( wcc_table, + src, + reachable_vertices_table + ) +</pre> + +\b Arguments +<dl class="arglist"> +<dt>wcc_table</dt> +<dd>TEXT. Name of the table that contains the output of weakly connected +components.</dd> + +<dt>src</dt> +<dd>TEXT. The vertex ID from which all reachable vertices have to be found.</dd> + +<dt>reachable_vertices_table</dt> +<dd>TEXT. Name of the output table that contains the list of vertices that are +reachable from the src vertex. The output table has the following columns: + - grouping_cols : The grouping columns given in the creation of wcc_table. If + there are no grouping columns, this column is not created. + - component_id : The ID of the component that both the src and dest vertices + belong to. + - dest : Vertex ID that is reachable from the src vertex. + Reachability is computed with regard to a component. + +</dd> +</dl> + +@anchor count +@par Count of Connected Components + +This function finds the total number of components in the input graph. + +<pre class="syntax"> +graph_wcc_num_cpts( wcc_table, + count_table + ) +</pre> + +\b Arguments +<dl class="arglist"> +<dt>wcc_table</dt> +<dd>TEXT. Name of the table that contains the output of weakly connected +components.</dd> + +<dt>count_table</dt> +<dd>TEXT. Name of the output table that contains the total number of components +per group in the graph, if there are any grouping_cols in wcc_table. The output +table has the following columns: + - grouping_cols : The grouping columns given in the creation of wcc_table. + If there are no grouping columns, this column is not created, + and count is with regard to the entire graph. + - num_components : Count of weakly connected components in a graph, or the + number of components within a group if grouping_cols is defined. +</dd> + +</dl> + @anchor examples @examp @@ -156,7 +333,7 @@ INSERT INTO edge VALUES -# Find all the weakly connected components in the graph: <pre class="syntax"> -DROP TABLE IF EXISTS wcc_out; +DROP TABLE IF EXISTS wcc_out, wcc_out_summary; SELECT madlib.weakly_connected_components( 'vertex', -- Vertex table 'id', -- Vertix id column @@ -185,10 +362,10 @@ SELECT * FROM wcc_out ORDER BY component_id, id; (14 rows) </pre> --# Now all the weakly connected components associated with each user +-# Now get the weakly connected components associated with each 'user_id' using the grouping feature: <pre class="syntax"> -DROP TABLE IF EXISTS wcc_out; +DROP TABLE IF EXISTS wcc_out, wcc_out_summary; SELECT madlib.weakly_connected_components( 'vertex', -- Vertex table 'id', -- Vertix id column @@ -216,9 +393,96 @@ SELECT * FROM wcc_out ORDER BY user_id, component_id, id; 16 | 14 | 2 (13 rows) </pre> -Note that vertex '4' is not identified as a separate component -in the above result. This is because disconnected nodes cannot be assigned to -a particular group with the current graph representation in MADlib. +Note that vertex 4 is not identified as a separate component +above. This is because there is no entry in the +edge table for vertex 4 indicating which group it belongs to +(though you could do that if you wanted to). + +-# Retrieve the largest connected component: +<pre class="syntax"> +DROP TABLE IF EXISTS largest_cpt_table; +SELECT madlib.graph_wcc_largest_cpt( + 'wcc_out', -- WCC output table + 'largest_cpt_table'); -- output table containing largest component ID +SELECT * FROM largest_cpt_table ORDER BY component_id; +</pre> +<pre class="result"> + user_id | component_id | num_vertices +---------+--------------+-------------- + 1 | 0 | 6 + 2 | 10 | 4 +(2 rows) +</pre> + +-# Retrieve histogram of the number of vertices per +connected component: +<pre class="syntax"> +DROP TABLE IF EXISTS histogram_table; +SELECT madlib.graph_wcc_histogram( + 'wcc_out', -- WCC output table + 'histogram_table'); -- output table containing the histogram of vertices +SELECT * FROM histogram_table ORDER BY component_id; +</pre> +<pre class="result"> + user_id | component_id | num_vertices +---------+--------------+-------------- + 1 | 0 | 6 + 2 | 10 | 4 + 2 | 14 | 3 +(3 rows) +</pre> + +-# Check if two vertices belong to the same component: +<pre class="syntax"> +DROP TABLE IF EXISTS vc_table; +SELECT madlib.graph_wcc_vertex_check( + 'wcc_out', -- WCC output table + '14,15', -- Pair of vertex IDs + 'vc_table'); -- output table containing components that contain the two vertices +SELECT * FROM vc_table ORDER BY component_id; +</pre> +<pre class="result"> + user_id | component_id +---------+-------------- + 2 | 14 +(1 row) +</pre> + +-# Retrieve all vertices reachable from a vertex +<pre class="syntax"> +DROP TABLE IF EXISTS reach_table; +SELECT madlib.graph_wcc_reachable_vertices( + 'wcc_out', -- WCC output table + '0', -- source vertex + 'reach_table'); -- output table containing all vertices reachable from source vertex +SELECT * FROM reach_table ORDER BY component_id, dest; +</pre> +<pre class="result"> + user_id | component_id | dest +---------+--------------+------ + 1 | 0 | 1 + 1 | 0 | 2 + 1 | 0 | 3 + 1 | 0 | 5 + 1 | 0 | 6 +(5 rows) +</pre> + +-# Count of connected components: +<pre class="syntax"> +DROP TABLE IF EXISTS count_table; +SELECT madlib.graph_wcc_num_cpts( + 'wcc_out', -- WCC output table + 'count_table'); -- output table containing number of components per group +SELECT * FROM count_table; +</pre> +<pre class="result"> + user_id | num_components +---------+---------------- + 1 | 1 + 2 | 2 +(2 rows) +</pre> */ @@ -249,6 +513,61 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.weakly_connected_components( $$ LANGUAGE sql VOLATILE m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); ------------------------------------------------------------------------- +-- HELPER functions +------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_wcc_largest_cpt( + wcc_table TEXT, + largest_cpt_table TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, wcc, graph_wcc_largest_cpt) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_wcc_histogram( + wcc_table TEXT, + histogram_table TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, wcc, graph_wcc_histogram) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_wcc_vertex_check( + wcc_table TEXT, + vertex_pair TEXT, + pair_table TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, wcc, graph_wcc_vertex_check) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_wcc_reachable_vertices( + wcc_table TEXT, + src TEXT, + reachable_vertices_table TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, wcc, graph_wcc_reachable_vertices) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_wcc_num_cpts( + wcc_table TEXT, + count_table TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, wcc, graph_wcc_num_cpts) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- + +------------------------------------------------------------------------- -- Online help CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.weakly_connected_components(