Github user njayaram2 commented on a diff in the pull request: https://github.com/apache/incubator-madlib/pull/144#discussion_r124682273 --- Diff: src/ports/postgres/modules/graph/wcc.py_in --- @@ -0,0 +1,329 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Weakly Connected Components + +# Please refer to the wcc.sql_in file for the documentation + +""" +@file wcc.py_in + +@namespace graph +""" + +import plpy +from utilities.control import MinWarning +from utilities.utilities import _assert +from utilities.utilities import extract_keyvalue_params +from utilities.utilities import unique_string, split_quoted_delimited_str +from utilities.validate_args import columns_exist_in_table, get_cols_and_types +from graph_utils import * + +m4_changequote(`<!', `!>') + + +def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, + edge_params, out_table, grouping_cols_list, module_name): + """ + Function to validate input parameters for wcc + """ + validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, + out_table, module_name) + if grouping_cols_list: + # validate the grouping columns. We currently only support grouping_cols + # to be column names in the edge_table, and not expressions! + _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib), + "Weakly Connected Components error: One or more grouping columns specified do not exist!") + with MinWarning("warning"): + plpy.warning("Grouping is not currently supported at the moment.") + +def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, + out_table, grouping_cols, **kwargs): + """ + Function that computes the wcc + + Args: + @param vertex_table + @param vertex_id + @param edge_table + @param dest_vertex + @param out_table + @param grouping_cols + """ + old_msg_level = plpy.execute(""" + SELECT setting + FROM pg_settings + WHERE name='client_min_messages' + """)[0]['setting'] + plpy.execute('SET client_min_messages TO warning') + params_types = {'src': str, 'dest': str} + default_args = {'src': 'src', 'dest': 'dest'} + edge_params = extract_keyvalue_params(edge_args, + params_types, default_args) + + # populate default values for optional params if null + if vertex_id is None: + vertex_id = "id" + if not grouping_cols: + grouping_cols = '' + + grouping_cols_list = split_quoted_delimited_str(grouping_cols) + validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, + edge_params, out_table, grouping_cols_list, 'Weakly Connected Components') + src = edge_params["src"] + dest = edge_params["dest"] + + message = unique_string(desp='message') + oldupdate = unique_string(desp='oldupdate') + newupdate = unique_string(desp='newupdate') + toupdate = unique_string(desp='toupdate') + temp_out_table = unique_string(desp='tempout') + + distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, + <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) + + is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) + + INT_MAX = 2147483647 + component_id = 'component_id' + plpy.execute(""" + CREATE TABLE {newupdate} AS + SELECT {vertex_id}, CAST({INT_MAX} AS INT) AS {component_id} + FROM {vertex_table} + {distribution} + """.format(**locals())) + if is_hawq: + plpy.execute(""" + CREATE TABLE {temp_out_table} AS + SELECT * FROM {newupdate} + LIMIT 0 + {distribution} + """.format(**locals())) + plpy.execute(""" + CREATE TEMP TABLE {message} AS + SELECT {vertex_id}, CAST({vertex_id} AS INT) AS {component_id} + FROM {vertex_table} + {distribution} + """.format(**locals())) + nodes_to_update = 1 + while nodes_to_update > 0: + # This idea here is simple. Look at all the neighbors of a node, and + # assign the smallest node id among the neighbors as its component_id. + # The next table starts off with very high component_id (INT_MAX). The + # component_id of all nodes which obtain a smaller component_id after + # looking at its neighbors are updated in the next table. At every + # iteration update only those nodes whose component_id in the previous + # iteration are greater than what was found in the current iteration. + plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) + plpy.execute(""" + CREATE TEMP TABLE {oldupdate} AS + SELECT {message}.{vertex_id}, + MIN({message}.{component_id}) AS {component_id} + FROM {message} + GROUP BY {vertex_id} + {distribution} + """.format(**locals())) + + plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate)) + plpy.execute(""" + CREATE TEMP TABLE {toupdate} AS + SELECT {oldupdate}.{vertex_id}, {oldupdate}.{component_id} + FROM {oldupdate}, {newupdate} + WHERE {oldupdate}.{vertex_id}={newupdate}.{vertex_id} + AND {oldupdate}.{component_id}<{newupdate}.{component_id} + {distribution} + """.format(**locals())) + + if is_hawq: + plpy.execute(""" + TRUNCATE TABLE {temp_out_table}; + INSERT INTO {temp_out_table} + SELECT * + FROM {newupdate} + WHERE NOT EXISTS ( + SELECT * + FROM {toupdate} + WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id} + ) + UNION + SELECT * FROM {toupdate}; + """.format(**locals())) + plpy.execute("DROP TABLE {0}".format(newupdate)) + plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table, + newupdate)) + plpy.execute(""" + CREATE TABLE {temp_out_table} AS + SELECT * FROM {newupdate} + LIMIT 0 + {distribution} + """.format(**locals())) + else: + plpy.execute(""" + UPDATE {newupdate} SET + {component_id}={toupdate}.{component_id} + FROM {toupdate} + WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id} + """.format(**locals())) + + plpy.execute("DROP TABLE IF EXISTS {0}".format(message)) + plpy.execute(""" + CREATE TEMP TABLE {message} AS + SELECT {vertex_id}, MIN({component_id}) AS {component_id} + FROM ( + SELECT {edge_table}.{src} AS {vertex_id}, {toupdate}.{component_id} + FROM {toupdate}, {edge_table} + WHERE {edge_table}.{dest} = {toupdate}.{vertex_id} + UNION ALL + SELECT {edge_table}.{dest} AS {vertex_id}, {toupdate}.{component_id} + FROM {toupdate}, {edge_table} --- End diff -- What this query is trying to do is the following: For each node, find all its neighbors and assign this node's `component_id` as their `component_id`. The `MIN()` in the outer `SELECT` clause will then choose the minimum of all possible neighbors' component_id and assign it to a node. I don't think the suggestion in this comment will help me do it any more efficiently. We will still have to do the joins, since the suggested query will only find all edges in the `edge_table` that have `{vertex_id}` in either src or dest. Hence assuming a no-op on this.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---