Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/incubator-madlib/pull/141#discussion_r124887868
  
    --- Diff: src/ports/postgres/modules/graph/bfs.py_in ---
    @@ -0,0 +1,498 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +# Breadth-First Search
    +
    +# Please refer to the bfs.sql_in file for the documentation
    +
    +"""
    +@file bfs.py_in
    +
    +@namespace graph
    +"""
    +
    +import plpy
    +from graph_utils import validate_graph_coding
    +from graph_utils import get_graph_usage
    +from graph_utils import _grp_null_checks
    +from utilities.control import MinWarning
    +from utilities.utilities import _assert
    +from utilities.utilities import extract_keyvalue_params
    +from utilities.utilities import split_quoted_delimited_str
    +from utilities.validate_args import table_exists
    +from utilities.validate_args import columns_exist_in_table
    +
    +m4_changequote(`<!', `!>')
    +
    +def _validate_bfs(vertex_table, vertex_id, edge_table, edge_params,
    +    source_vertex, out_table, max_distance, directed, grouping_cols_list, 
**kwargs):
    +
    +    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
    +        out_table,'BFS')
    +
    +    _assert((max_distance >= 0) and isinstance(max_distance,int),
    +        """Graph BFS: Invalid max_distance type or value ({0}), must be 
integer, 
    +        be greater than or equal to 0 and be less than max allowable 
integer 
    +        (2147483647).""".
    +        format(max_distance))
    +
    +    _assert(isinstance(directed,bool),
    +        """Graph BFS: Invalid value for directed ({0}), must be 
boolean.""".
    +        format(directed))
    +
    +    _assert(isinstance(source_vertex,int),
    +        """Graph BFS: Source vertex {source_vertex} has to be an 
integer.""".
    +        format(**locals()))
    +    src_exists = plpy.execute("""
    +        SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
    +        """.format(**locals()))
    +    if src_exists.nrows() == 0:
    +        plpy.error(
    +            """Graph BFS: Source vertex {source_vertex} is not present in 
the 
    +            vertex table {vertex_table}.""".
    +            format(**locals()))
    +
    +    vt_error = plpy.execute(
    +        """ SELECT {vertex_id}
    +            FROM {vertex_table}
    +            WHERE {vertex_id} IS NOT NULL
    +            GROUP BY {vertex_id}
    +            HAVING count(*) > 1 """.format(**locals()))
    +    if vt_error.nrows() != 0:
    +        plpy.error(
    +            """Graph BFS: Source vertex table {vertex_table} contains 
duplicate 
    +            vertex id's.""".
    +            format(**locals()))
    +
    +    _assert(not table_exists(out_table+"_summary"),
    +        "Graph BFS: Output summary table already exists!")
    +
    +    if grouping_cols_list is not None:
    +        _assert(columns_exist_in_table(edge_table, grouping_cols_list),
    +            """Graph BFS: Not all columns from {grouping_cols_list} are 
present 
    +            in edge table ({edge_table}).""".
    +            format(**locals()))
    +
    +    return None
    +
    +
    +def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
    +        edge_args, source_vertex, out_table, max_distance, directed, 
grouping_cols,
    +        **kwargs):
    +
    +    """
    +    Breadth First Search algorithm for graphs [1].
    +    Args:
    +        @param vertex_table    Name of the table that contains the vertex 
data.
    +        @param vertex_id       Name of the column containing the vertex 
ids.
    +        @param edge_table      Name of the table that contains the edge 
data.
    +        @param edge_args       A comma-delimited string containing multiple
    +                               named arguments of the form "name=value".
    +        @param source_vertex   The source vertex id for the algorithm to 
start.
    +        @param out_table       Name of the table to store the result of 
SSSP.
    +        @param max_distance    Maximum distance from the source_vertex to 
search for.
    +        @param directed        Graph will be treated as directed if this 
boolean flag 
    +                               is set to TRUE. Graph is treated as 
undirected by default.
    +        @param grouping_cols   The list of grouping columns.
    +
    +    [1] https://en.wikipedia.org/wiki/Breadth-first_search
    +    """
    +
    +    with MinWarning("warning"):
    +
    +        INT_MAX = 2147483647
    +
    +        params_types = {'src': str, 'dest': str}
    +        default_args = {'src': 'src', 'dest': 'dest'}
    +        edge_params = extract_keyvalue_params(edge_args,
    +                                            params_types,
    +                                            default_args)
    +
    +        # Prepare the input for recording in the summary table
    +        if vertex_id is None:
    +            v_st= "NULL"
    +            vertex_id = "id"
    +        else:
    +            v_st = vertex_id
    +        if edge_args is None:
    +            e_st = "NULL"
    +        else:
    +            e_st = edge_args
    +        if max_distance is None:
    +            d_st= "NULL"
    +            max_distance = INT_MAX
    +        else:
    +            d_st = max_distance
    +        if directed is None:
    +            dir_st= "NULL"
    +            directed = False
    +        else:
    +            dir_st = directed
    +        if grouping_cols is None:
    +            g_st = "NULL"
    +            glist = None
    +        else:
    +            g_st = grouping_cols
    +            glist = split_quoted_delimited_str(grouping_cols)
    +
    +        src = edge_params["src"]
    +        dest = edge_params["dest"]
    +        
    +        distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
    +            <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
    +        local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
    +            <!"DISTRIBUTED BY (id)"!>)
    +
    +        _validate_bfs(vertex_table, vertex_id, edge_table,
    +            edge_params, source_vertex, out_table, max_distance, directed, 
glist)
    +
    +        # Initialize grouping related variables
    +        grp_comma = ""
    +        and_grp_null_checks = ""
    +
    +        if grouping_cols is not None:
    +            grp_comma = grouping_cols + ", "
    +            and_grp_null_checks = " AND " + _grp_null_checks(glist)
    +            
    +        # We keep a table of every vertex, the distance to that vertex 
from source
    +        # and the parent in the path to the vertex
    +        # This table will be updated throughout the execution.
    +        dist_col = "dist"
    +        parent_col = "parent"
    +        curr_dist_val = 0
    +
    +        # Creating the output table with the appropriate columns and data 
types
    +        plpy.execute("""
    +            CREATE TABLE {out_table} AS ( 
    +                SELECT
    +                    {grp_comma} 
    +                    {src} AS {vertex_id}, 
    +                    {curr_dist_val}::INT AS {dist_col},
    +                    {src} AS {parent_col}
    +                FROM {edge_table} 
    +                LIMIT 0
    +            ) {distribution}""".format(**locals()))
    +
    +        # We keep a summary table to keep track of the parameters used for 
this
    +        # BFS run
    +        plpy.execute( """ 
    +            CREATE TABLE {out_table}_summary  (
    +                vertex_table            TEXT,
    +                vertex_id               TEXT,
    +                edge_table              TEXT,
    +                edge_args               TEXT,
    +                source_vertex           INTEGER,
    +                out_table               TEXT,
    +                max_distance            INTEGER,
    +                directed                BOOLEAN,
    +                grouping_cols           TEXT
    +            )""".format(**locals()))
    +        plpy.execute(""" 
    +            INSERT INTO {out_table}_summary VALUES
    +                ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
    +                {source_vertex}, '{out_table}', {d_st}, {dir_st}, '{g_st}')
    +            """.format(**locals()))
    +
    +        
    +        # Initialization is different for directed and undirected graphs
    +        # In the undirected case edges can be considered to go from {src} 
to
    +        # {dest} and {dest} to {src}
    +
    +        # This step inserts into the output table the source vertex for 
each 
    +        # group in which it is present. Grouping behavior is not 
predictable
    +        # when there are NULLs in any grouping column. Therefore those rows
    +        # are explicitly removed from analysis
    +
    +        # After initialization of the output table, number of nodes 
connected
    +        # by edges to the source vertex in each group is counted. This is 
used 
    +        # below in the BFS iteration loop
    +
    +        insert_qry_undirected_init = ""
    +        count_qry_undirected_init = ""
    +
    +        if not directed:
    +            insert_qry_undirected_init = """ OR {dest} = {source_vertex}
    +                """.format(**locals())
    +
    +            count_qry_undirected_init = """ OR (
    +                    ({grp_comma} {dest}) IN ( 
    +                        SELECT {grp_comma} {vertex_id} FROM {out_table} 
    +                        WHERE {dist_col}={curr_dist_val}
    +                    ) 
    +                    AND 
    +                    ({grp_comma} {src}) NOT IN (
    +                        SELECT {grp_comma} {vertex_id} FROM {out_table}
    +                    )
    +                )
    +            """.format(**locals())
    +
    +        plpy.execute("""
    +                INSERT INTO {out_table}
    +                SELECT {grp_comma} 
    +                    {source_vertex} AS {vertex_id},
    +                    {curr_dist_val} AS {dist_col},
    +                    NULL AS {parent_col}
    +                FROM {edge_table}
    +                WHERE ({src} = {source_vertex} 
{insert_qry_undirected_init})
    +                    {and_grp_null_checks}
    +                GROUP BY {grp_comma} {vertex_id}, {dist_col}
    +            """.format(**locals()))
    +
    +        vct = plpy.execute("""
    +            SELECT COUNT(*)
    +            FROM {edge_table} 
    +            WHERE (
    +                ({grp_comma} {src}) IN (
    +                    SELECT {grp_comma} {vertex_id} FROM {out_table} 
    +                    WHERE {dist_col}={curr_dist_val}
    +                ) 
    +                AND 
    +                ({grp_comma} {dest}) NOT IN (
    +                    SELECT {grp_comma} {vertex_id} FROM {out_table}
    +                )
    +            ) {count_qry_undirected_init}
    +        """.format(**locals()))[0]['count']
    +        
    +        # Main loop for traversing the graph
    +        while vct > 0 and curr_dist_val < max_distance:
    +
    +            # The loop consists of two steps:
    +            # 1) Disover and store all nodes that are linked to nodes 
found in 
    +            #    the immediate previous iteration of the loop that have 
not already 
    +            #    been found in all previous iterations  
    +            # 2) Check for any nodes linked to those discovered in Step 1 
above
    +            #    that have not yet been discovered
    +            # 
    +            # If a node has multiple possible parents then the parent with 
the 
    +            # smallest ID is chosen for output
    +
    +            # In the directed graph case only nodes in the {dest} column 
of 
    +            # the edge table are searched to find new nodes reachable from 
    +            # previously discovered nodes
    +
    +            # In the undirected graph case edges are treated as 
non-directional 
    +            # (or bidirectional). Nodes in both the {src} and {dest} 
columns of 
    +            # the edge table are searched to find new nodes reachable from 
    +            # previously discovered nodes.
    +            # This approach does NOT require the user to provide a forward 
edge
    +            # and a reverse edge between the same two nodes to indicate 
the 
    +            # graph's undirected nature. However, it will work in that 
scenario
    +            # as well.
    +
    +            insert_qry_undirected_part = ""
    +            count_qry_undirected_part = ""
    +            
    +            if not directed:
    +                insert_qry_undirected_part = """ UNION
    +                    SELECT {grp_comma} 
    +                        {src} AS {vertex_id}, 
    +                        {curr_dist_val}+1 AS {dist_col}, 
    +                        {dest} AS {parent_col}
    +                    FROM {edge_table} 
    +                    WHERE (
    +                        ({grp_comma} {dest}) IN (
    +                            SELECT {grp_comma} {vertex_id} FROM 
{out_table} 
    +                            WHERE {dist_col}={curr_dist_val}
    +                        ) 
    +                        AND 
    +                        ({grp_comma} {src}) NOT IN (
    +                            SELECT {grp_comma} {vertex_id} FROM {out_table}
    +                        )
    +                    )
    +                """.format(**locals())
    +                
    +                count_qry_undirected_part = """ OR (
    +                    ({grp_comma} {dest}) IN ( 
    +                        SELECT {grp_comma} {vertex_id} FROM {out_table} 
    +                        WHERE {dist_col}={curr_dist_val}
    +                    ) 
    +                    AND 
    +                    ({grp_comma} {src}) NOT IN (
    +                        SELECT {grp_comma} {vertex_id} FROM {out_table}
    +                    )
    +                )
    +                """.format(**locals())
    +
    +            # Discover and store all nodes (not already found) connected 
to 
    +            # those found in the immediate previous iteration
    +            plpy.execute("""
    +                INSERT INTO {out_table}
    +                SELECT {grp_comma} {vertex_id}, {dist_col}, 
min({parent_col})
    +                FROM (
    +                    SELECT {grp_comma} 
    +                        {dest} AS {vertex_id}, 
    +                        {curr_dist_val}+1 AS {dist_col}, 
    +                        {src} AS {parent_col}
    +                    FROM {edge_table} 
    +                    WHERE (
    +                        ({grp_comma} {src}) IN (
    +                            SELECT {grp_comma} {vertex_id} FROM 
{out_table} 
    +                            WHERE {dist_col}={curr_dist_val}
    +                        ) 
    +                        AND 
    +                        ({grp_comma} {dest}) NOT IN (
    +                            SELECT {grp_comma} {vertex_id} FROM {out_table}
    +                        )
    +                    )
    +                    {insert_qry_undirected_part}
    +                ) t1
    +                GROUP BY {grp_comma} {vertex_id}, {dist_col}
    +            """.format(**locals()))
    +
    +            curr_dist_val = curr_dist_val + 1
    +
    +            # Count / find any nodes that are connected to those 
discovered and 
    +            # stored in this iteration. This is used to check if the 
iterations
    +            # need to continue.
    +            vct = plpy.execute("""
    +                SELECT COUNT(*) 
    +                FROM {edge_table} 
    +                WHERE (
    +                    ({grp_comma} {src}) IN ( 
    +                        SELECT {grp_comma} {vertex_id} FROM {out_table} 
    +                        WHERE {dist_col}={curr_dist_val}
    +                    ) 
    +                    AND 
    +                    ({grp_comma} {dest}) NOT IN (
    +                        SELECT {grp_comma} {vertex_id} FROM {out_table}
    +                    )
    +                ) {count_qry_undirected_part}
    +            """.format(**locals()))[0]['count']
    +
    +    return None
    +
    +def graph_bfs_help(schema_madlib, message, **kwargs):
    +    """
    +    Help function for graph_bfs
    +
    +    Args:
    +        @param schema_madlib
    +        @param message: string, Help message string
    +        @param kwargs
    +
    +    Returns:
    +        String. Help/usage information
    +    """
    +
    +    if not message:
    +        help_string = """
    +-----------------------------------------------------------------------
    +                            SUMMARY
    +-----------------------------------------------------------------------
    +
    +Given a graph and a source vertex, the Breadth-first Search (BFS) algorithm
    +finds all nodes reachable from the source vertex by searching / traversing 
the graph 
    +in a breadth-first manner.
    +
    +For more details on function usage:
    +    SELECT {schema_madlib}.graph_bfs('usage')
    +            """
    +    elif message.lower() in ['usage', 'help', '?']:
    +        help_string = """
    +Given a graph and a source vertex, the Breadth-first Search (BFS) algorithm
    +finds all nodes reachable from the source vertex by searching / traversing 
the graph 
    +in a breadth-first manner.
    +
    +{graph_usage}
    +
    
+----------------------------------------------------------------------------
    +                            OUTPUT
    
+----------------------------------------------------------------------------
    +The output of BFS ('out_table' above) contains a row for every vertex of 
that is 
    +reachable from the source_vertex. In the presence of grouping columns, 
only those 
    +edges are used for which there are no NULL values in any grouping column.
    +The output table will have the following columns (in addition to the 
    +grouping columns):
    +  - vertex_id : The id for any node reachable from source_vertex. 
    +                Will use the input parameter 'vertex_id' for column naming.
    +  - dist      : The number of edges (or hops) from the source_vertex to 
where 
    +                this vertex is located. 
    +  - parent    : The parent of this vertex in BFS traversal of the graph 
from 
    +                source_vertex. Will use 'parent' for column naming. For 
the 
    +                case where vertex_id = source_vertex, the value for parent 
is NULL.
    +"""
    +    elif message.lower() in ("example", "examples"):
    +        help_string = """
    
+----------------------------------------------------------------------------
    +                                EXAMPLES
    
+----------------------------------------------------------------------------
    +-- Create a graph, represented as vertex and edge tables.
    +DROP TABLE IF EXISTS vertex, edge;
    +CREATE TABLE vertex(
    +        id INTEGER
    +        );
    +CREATE TABLE edge(
    +        src INTEGER,
    +        dest INTEGER
    +        );
    +INSERT INTO vertex VALUES
    +(0),
    +(1),
    +(2),
    +(3),
    +(4),
    +(5),
    +(6),
    +(7),
    +(8),
    +(9),
    +(10),
    +(11)
    +;
    +INSERT INTO edge VALUES
    +(0, 5),
    +(1, 0),
    +(1, 3),
    +(2, 6),
    +(3, 4),
    +(3, 5),
    +(4, 2),
    +(8, 9),
    +(9, 10),
    +(9, 11),
    +(10, 8)
    +;
    +
    +-- Traverse undirected graph from vertex 3:
    +DROP TABLE IF EXISTS out, out_summary;
    +SELECT madlib.graph_bfs(
    +                         'vertex',      -- Vertex table
    +                         NULL,          -- Vertix id column (NULL means 
use default naming)
    +                         'edge',        -- Edge table
    +                         NULL,          -- Edge arguments (NULL means use 
default naming)
    +                         3,             -- Source vertex for BFS
    +                         'out'          -- Output table of nodes reachable 
from source_vertex
    +                        );        
    +                        -- Default values used for the other arguments
    +SELECT * FROM out ORDER BY dist,id;
    +
    +SELECT * FROM out_summary;
    +
    +"""
    +    else:
    +        help_string = "No such option. Use {schema_madlib}.graph_sssp()"
    +
    --- End diff --
    
    `graph_sssp()` -> `graph_bfs()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to