Github user njayaram2 commented on a diff in the pull request: https://github.com/apache/incubator-madlib/pull/141#discussion_r122523632 --- Diff: src/ports/postgres/modules/graph/bfs.py_in --- @@ -0,0 +1,445 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Breadth-First Search + +# Please refer to the bfs.sql_in file for the documentation + +""" +@file bfs.py_in + +@namespace graph +""" + +import plpy +from graph_utils import * +from utilities.control import MinWarning +from utilities.utilities import _assert +from utilities.utilities import extract_keyvalue_params +from utilities.utilities import split_quoted_delimited_str +from utilities.validate_args import table_exists +from utilities.validate_args import columns_exist_in_table + +m4_changequote(`<!', `!>') + +def _validate_bfs(vertex_table, vertex_id, edge_table, edge_params, + source_vertex, out_table, max_distance, directed, grouping_cols_list, **kwargs): + + validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, + out_table,'BFS') + + _assert((max_distance >= 0) and isinstance(max_distance,int), + """Graph BFS: Invalid max_distance type or value ({0}), must be integer, + be greater than or equal to 0 and be less than max allowable integer + (2147483647).""". + format(max_distance)) + + _assert(isinstance(directed,bool), + """Graph BFS: Invalid value for directed ({0}), must be boolean.""". + format(directed)) + + _assert(isinstance(source_vertex,int), + """Graph BFS: Source vertex {source_vertex} has to be an integer.""". + format(**locals())) + src_exists = plpy.execute(""" + SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex} + """.format(**locals())) + if src_exists.nrows() == 0: + plpy.error( + """Graph BFS: Source vertex {source_vertex} is not present in the + vertex table {vertex_table}.""". + format(**locals())) + + vt_error = plpy.execute( + """ SELECT {vertex_id} + FROM {vertex_table} + WHERE {vertex_id} IS NOT NULL + GROUP BY {vertex_id} + HAVING count(*) > 1 """.format(**locals())) + if vt_error.nrows() != 0: + plpy.error( + """Graph BFS: Source vertex table {vertex_table} contains duplicate + vertex id's.""". + format(**locals())) + + _assert(not table_exists(out_table+"_summary"), + "Graph BFS: Output summary table already exists!") + + if grouping_cols_list is not None: + _assert(columns_exist_in_table(edge_table, grouping_cols_list), + """Graph BFS: Not all columns from {grouping_cols_list} are present + in edge table ({edge_table}).""". + format(**locals())) + + return None + +def _grp_from_table(tbl, grp_list): + + """ + Helper function for selecting grouping columns of a table + Args: + @param tbl Name of the table + @param grp_list The list of grouping columns + """ + return ' , '.join([" {tbl}.{i} ".format(**locals()) + for i in grp_list]) + +def _grp_null_checks(grp_list): + + """ + Helper function for generating NULL checks for grouping columns + to be used within a WHERE clause + Args: + @param grp_list The list of grouping columns + """ + return ' AND '.join([" {i} IS NOT NULL ".format(**locals()) + for i in grp_list]) + +def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table, + edge_args, source_vertex, out_table, max_distance, directed, grouping_cols, **kwargs): + + """ + Breadth First Search algorithm for graphs [1]. + Args: + @param vertex_table Name of the table that contains the vertex data. + @param vertex_id Name of the column containing the vertex ids. + @param edge_table Name of the table that contains the edge data. + @param edge_args A comma-delimited string containing multiple + named arguments of the form "name=value". + @param source_vertex The source vertex id for the algorithm to start. + @param out_table Name of the table to store the result of SSSP. + @param max_distance Maximum distance from the source_vertex to search for. + @param directed Graph will be treated as directed if this boolean flag + is set to TRUE. Graph is treated as undirected by default. + @param grouping_cols The list of grouping columns. + + [1] https://en.wikipedia.org/wiki/Breadth-first_search + """ + + with MinWarning("warning"): + + INT_MAX = 2147483647 + INFINITY = "'Infinity'" + EPSILON = 0.000001 + + params_types = {'src': str, 'dest': str, 'weight': str} + default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} + edge_params = extract_keyvalue_params(edge_args, + params_types, + default_args) + + # Prepare the input for recording in the summary table + if vertex_id is None: + v_st= "NULL" + vertex_id = "id" + else: + v_st = vertex_id + if edge_args is None: + e_st = "NULL" + else: + e_st = edge_args + if max_distance is None: + d_st= "NULL" + max_distance = INT_MAX + else: + d_st = max_distance + if directed is None: + dir_st= "NULL" + directed = False + else: + dir_st = directed + if grouping_cols is None: + g_st = "NULL" + glist = None + else: + g_st = grouping_cols + glist = split_quoted_delimited_str(grouping_cols) + + src = edge_params["src"] + dest = edge_params["dest"] + weight = edge_params["weight"] + + distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, + <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) + local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, + <!"DISTRIBUTED BY (id)"!>) + + _validate_bfs(vertex_table, vertex_id, edge_table, + edge_params, source_vertex, out_table, max_distance, directed, glist) + + # Initialize grouping related variables + grp_comma = "" + and_grp_null_checks = "" + + if grouping_cols is not None: + grp_comma = grouping_cols + ", " + and_grp_null_checks = " AND " + _grp_null_checks(glist) + + # We keep a table of every vertex, the distance to that vertex from source + # and the parent in the path to the vertex + # This table will be updated throughout the execution. + dist_col = "dist" + parent_col = "parent" + curr_dist_val = 0 + + # Creating the output table with the appropriate columns and data types + plpy.execute(""" + CREATE TABLE {out_table} AS ( + SELECT + {grp_comma} + {src} AS {vertex_id}, + {curr_dist_val}::INT AS {dist_col}, + {src} AS {parent_col} + FROM {edge_table} + LIMIT 0 + ) {distribution}""".format(**locals())) + + # We keep a summary table to keep track of the parameters used for this + # BFS run + plpy.execute( """ + CREATE TABLE {out_table}_summary ( + vertex_table TEXT, + vertex_id TEXT, + edge_table TEXT, + edge_args TEXT, + source_vertex INTEGER, + out_table TEXT, + max_distance INTEGER, + directed BOOLEAN, + grouping_cols TEXT + )""".format(**locals())) + plpy.execute(""" + INSERT INTO {out_table}_summary VALUES + ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', + {source_vertex}, '{out_table}', {d_st}, {dir_st}, '{g_st}') + """.format(**locals())) + + + # Initialization is different for directed and undirected graphs + # In the undirected case edges can be considered to go from {src} to + # {dest} and {dest} to {src} + + # This step inserts into the output table the source vertex for each + # group in which it is present. Grouping behavior is not predictable + # when there are NULLs in any grouping column. Therefore those rows + # are explicitly removed from analysis + + # After initialization of the output table, number of nodes connected + # by edges to the source vertex in each group is counted. This is used + # below in the BFS iteration loop + + if directed: + plpy.execute(""" + INSERT INTO {out_table} + SELECT {grp_comma} + {source_vertex} AS {vertex_id}, + {curr_dist_val} AS {dist_col}, + NULL AS {parent_col} + FROM {edge_table} + WHERE {src} = {source_vertex} + {and_grp_null_checks} + GROUP BY {grp_comma} {vertex_id}, {dist_col} + """.format(**locals())) + + vct = plpy.execute(""" + SELECT COUNT(*) + FROM {edge_table} + WHERE ( + ({grp_comma} {src}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {dest}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + """.format(**locals()))[0]['count'] + + else: + plpy.execute(""" + INSERT INTO {out_table} + SELECT {grp_comma} + {source_vertex} AS {vertex_id}, + {curr_dist_val} AS {dist_col}, + NULL AS {parent_col} + FROM {edge_table} + WHERE + ({src} = {source_vertex} OR {dest} = {source_vertex}) + {and_grp_null_checks} + GROUP BY {grp_comma} {vertex_id}, {dist_col} + """.format(**locals())) + + vct = plpy.execute(""" + SELECT COUNT(*) + FROM {edge_table} + WHERE ( + ({grp_comma} {src}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {dest}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) OR ( + ({grp_comma} {dest}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {src}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + """.format(**locals()))[0]['count'] + + # Main loop for traversing the graph + while vct > 0 and curr_dist_val < max_distance: + + # The loop consists of two steps: + # 1) Disover and store all nodes that are linked to nodes found in + # the immediate previous iteration of the loop that have not already + # been found in all previous iterations + # 2) Check for any nodes linked to those discovered in Step 1 above + # that have not yet been discovered + # + # If a node has multiple possible parents then the node with the + # smalles ID is chosen as the parent for output + + if directed: + # In the directed graph case only nodes in the {dest} column of + # the edge table are searched to find new nodes reachable from + # previously discovered nodes + plpy.execute(""" + INSERT INTO {out_table} + SELECT {grp_comma} {vertex_id}, {dist_col}, min({parent_col}) + FROM ( + SELECT {grp_comma} + {dest} AS {vertex_id}, + {curr_dist_val}+1 AS {dist_col}, + {src} AS {parent_col} + FROM {edge_table} + WHERE ( + ({grp_comma} {src}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {dest}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + ) t1 + GROUP BY {grp_comma} {vertex_id}, {dist_col} + """.format(**locals())) + + curr_dist_val = curr_dist_val + 1 + + vct = plpy.execute(""" + SELECT COUNT(*) + FROM {edge_table} + WHERE ( + ({grp_comma} {src}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {dest}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + """.format(**locals()))[0]['count'] + + else: + # In the undirected graph case edges are treated as non-directional + # (or bidirectional). Nodes in both the {src} and {dest} columns of + # the edge table are searched to find new nodes reachable from + # previously discovered nodes. + # This approach does NOT require the user to provide a forward edge + # and a reverse edge between the same two nodes to indicate the + # graph's undirected nature. However, it will work in that scenario + # as well. + + plpy.execute(""" + INSERT INTO {out_table} + SELECT {grp_comma} {vertex_id}, {dist_col}, min({parent_col}) + FROM ( + SELECT {grp_comma} + {dest} AS {vertex_id}, + {curr_dist_val}+1 AS {dist_col}, + {src} AS {parent_col} + FROM {edge_table} + WHERE ( + ({grp_comma} {src}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {dest}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + UNION + SELECT {grp_comma} + {src} AS {vertex_id}, + {curr_dist_val}+1 AS {dist_col}, + {dest} AS {parent_col} + FROM {edge_table} + WHERE ( + ({grp_comma} {dest}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {src}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + ) t1 + GROUP BY {grp_comma} {vertex_id}, {dist_col} + """.format(**locals())) + + curr_dist_val = curr_dist_val + 1 + + vct = plpy.execute(""" + SELECT COUNT(*) + FROM {edge_table} + WHERE ( + ({grp_comma} {src}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {dest}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) OR ( + ({grp_comma} {dest}) IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + WHERE {dist_col}={curr_dist_val} + ) + AND + ({grp_comma} {src}) NOT IN ( + SELECT {grp_comma} {vertex_id} FROM {out_table} + ) + ) + """.format(**locals()))[0]['count'] + --- End diff -- We could probably replace the if-else with a single block here too, similar to the comment on line 312.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---