Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/incubator-madlib/pull/113#discussion_r110786784
  
    --- Diff: src/ports/postgres/modules/graph/sssp.py_in ---
    @@ -198,110 +378,228 @@ def graph_sssp(schema_madlib, vertex_table, 
vertex_id, edge_table,
                        # values.
     
                        sql = (""" INSERT INTO {newupdate}
    -                           SELECT DISTINCT ON (message.id) message.id AS 
id,
    -                                   message.val AS val,
    -                                   message.parent AS parent
    +                           SELECT DISTINCT ON (message.id {comma_grp})
    +                                   message.id AS id,
    +                                   message.{weight} AS {weight},
    +                                   message.parent AS parent {comma_grp_m}
                                FROM {out_table} AS out_table INNER JOIN
                                        (
    -                                           SELECT edge_table.{dest} AS id, 
x.val AS val,
    -                                                   oldupdate.id AS parent
    +                                   SELECT {edge_table}.{dest} AS id, 
x.{weight} AS {weight},
    +                                           oldupdate.id AS parent 
{comma_grp_e}
    +                                   FROM {oldupdate} AS oldupdate INNER JOIN
    +                                           {edge_table}  ON
    +                                                   ({edge_table}.{src} = 
oldupdate.id {checkg_eo})
    +                                           INNER JOIN
    +                                           (
    +                                           SELECT {edge_table}.{dest} AS 
id,
    +                                                   min(oldupdate.{weight} +
    +                                                           
{edge_table}.{weight}) AS {weight} {comma_grp_e}
                                                FROM {oldupdate} AS oldupdate 
INNER JOIN
    -                                                   {edge_table} AS 
edge_table ON
    -                                                   (edge_table.{src} = 
oldupdate.id) INNER JOIN
    -                                                   (
    -                                                           SELECT 
edge_table.{dest} AS id,
    -                                                                   
min(oldupdate.val + edge_table.{weight})
    -                                                                   AS val
    -                                                           FROM 
{oldupdate} AS oldupdate INNER JOIN
    -                                                                   
{edge_table} AS edge_table ON
    -                                                                   
(edge_table.{src}=oldupdate.id)
    -                                                           GROUP BY 
edge_table.{dest}
    -                                                   ) x ON 
(edge_table.{dest} = x.id)
    -                                           WHERE ABS(oldupdate.val + 
edge_table.{weight} - x.val)
    -                                                   < {EPSILON}
    -                                   ) AS message ON (message.id = 
out_table.{vertex_id})
    -                           WHERE message.val<out_table.{weight}
    +                                                   {edge_table}  ON
    +                                                   
({edge_table}.{src}=oldupdate.id {checkg_eo})
    +                                           GROUP BY {edge_table}.{dest} 
{comma_grp_e}
    +                                           ) x
    +                                           ON ({edge_table}.{dest} = x.id 
{checkg_ex} )
    +                                   WHERE ABS(oldupdate.{weight} + 
{edge_table}.{weight}
    +                                                           - x.{weight}) < 
{EPSILON}
    +                                   ) message
    +                                   ON (message.id = out_table.{vertex_id} 
{checkg_om})
    +                           WHERE message.{weight}<out_table.{weight}
                                """.format(**locals()))
     
    -                   # If there are no updates, SSSP is finalized
    -                   ret = plpy.execute(sql)
    -                   if ret.nrows() == 0:
    -                           break
    +                   plpy.execute(sql)
     
    -                   # Swap the update tables for the next iteration
    +                   # Swap the update tables for the next iteration.
                        tmp = oldupdate
                        oldupdate = newupdate
                        newupdate = tmp
     
    -           # Bellman-Ford should converge in |V|-1 iterations.
    +           # The algorithm should converge in less than |V| iterations.
    +           # Otherwise there is a negative cycle in the graph.
                if i == v_cnt:
    -                   plpy.execute("DROP TABLE IF EXISTS 
{out_table}".format(**locals()))
    -                   plpy.error("Graph SSSP: Detected a negative cycle in 
the graph.")
    -
    -           m4_ifdef(<!__HAWQ__!>,
    -                   plpy.execute("DROP TABLE {temp_table} 
".format(**locals())), <!''!>)
    +                   if grouping_cols is None:
    +                           plpy.execute("DROP TABLE IF EXISTS {out_table}".
    +                                   format(**locals()))
    +                           plpy.error("Graph SSSP: Detected a negative 
cycle in the graph.")
    +
    +                   # It is possible that not all groups has negative 
cycles.
    +                   else:
    +
    +                           # gsql is the string created by collating 
grouping columns.
    +                           # By looking at the oldupdate table we can see 
which groups
    +                           # are in a negative cycle.
    +
    +                           negs = plpy.execute(
    +                                   """ SELECT array_agg(DISTINCT 
({grouping_cols})) AS grp
    +                                           FROM {oldupdate}
    +                                   """.format(**locals()))[0]['grp']
    +
    +                           # Delete the groups with negative cycles from 
the output table.
    +                           sql_del = """ DELETE FROM {out_table}
    +                                   USING {oldupdate} AS oldupdate
    +                                   WHERE {checkg_oo_sub}"""
    +                           if is_hawq:
    +                                   sql_del = """
    +                                           TRUNCATE TABLE {temp_table};
    +                                           INSERT INTO {temp_table}
    +                                                   SELECT *
    +                                                   FROM {out_table}
    +                                                   WHERE NOT EXISTS(
    +                                                           SELECT 1
    +                                                           FROM 
{oldupdate} as oldupdate
    +                                                           WHERE 
{checkg_oo_sub}
    +                                                           );
    +                                           DROP TABLE {out_table};
    +                                           ALTER TABLE {temp_table} RENAME 
TO {out_table};"""
    +
    +                           plpy.execute(sql_del.format(**locals()))
    +
    +                           # If every group has a negative cycle,
    +                           # drop the output table as well.
    +                           if table_is_empty(out_table):
    +                                   plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".
    +                                           
format(out_table,out_table+"_summary"))
    +
    +                           plpy.warning(
    +                                   """Graph SSSP: Detected a negative 
cycle in the """ +
    +                                   """sub-graphs of following groups: 
{0}.""".
    +                                   format(str(negs)[1:-1]))
    +
    +           if is_hawq:
    +                   plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
    +                           format(**locals()))
     
        return None
     
    -def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
    +def _path_helper(plan_name, dest_vertex, path_table, gset):
        """
    -   Helper function that can be used to get the shortest path for a vertex
    -    Args:
    -           @param source_table     Name of the table that contains the 
SSSP output.
    -        @param out_table   The vertex that will be the destination of the
    -                                           desired path.
    +   Helper function for graph_sssp_get_path. Invoked once per group.
    +   @param plan_name    Name of the plan that finds the parent of a given
    +                       vertex.
    +    @param dest_vertex  The vertex that will be the destination of the
    +                        desired path.
    +    @param path_table   Name of the output table that will contain the 
path(s).
    +    @param gset            List of grouping columns values followed by a 
comma.
    +                        Empty string if there are no grouping columns.
        """
     
    -   validate_get_path(sssp_table, dest_vertex)
    +   ret = [str(dest_vertex)]
        cur = dest_vertex
    -   cols = get_cols(sssp_table)
    -   id = cols[0]
    -   ret = [dest_vertex]
    -   plan_name = unique_string(desp='plan')
    -
    -   # Follow the 'parent' chain until you reach the source.
    -   # We don't need to know what the source is since it is the only vertex 
with
    -   # itself as its parent
    -   plpy.execute(""" PREPARE {plan_name} (int) AS
    -           SELECT parent FROM {sssp_table} WHERE {id} = $1 LIMIT 1
    -           """.format(**locals()))
        sql = "EXECUTE {plan_name} ({cur})"
        parent = plpy.execute(sql.format(**locals()))
    +   if parent.nrows() > 0:
    +           while 1:
    +                   parent = parent[0]['parent']
    +                   if parent == cur:
    +                           ret.reverse()
    +                           ret = ",".join(ret)
    +                           plpy.execute(
    +                                   """ INSERT INTO {path_table} VALUES 
({gset} '{ret}')
    +                                   """.format(**locals()))
    +                           return True
    +                   else:
    +                           ret.append(str(parent))
    +                           cur = parent
    +
    +                   parent = plpy.execute(sql.format(**locals()))
    +
    +   return False
    +
    +def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
    +   **kwargs):
    +   """
    +    Helper function that can be used to get the shortest path for a vertex
    +    Args:
    +        @param source_table Name of the table that contains the SSSP 
output.
    +        @param out_table    The vertex that will be the destination of the
    +                            desired path.
    +        @param path_table   Name of the output table that contains the 
path.
    +   """
    +   with MinWarning("warning"):
    +
    +           _validate_get_path(sssp_table, dest_vertex)
    +           plan_name = unique_string(desp='plan')
    +
    +           select_grps = ""
    +           gsql = ""
    +
    +           path_flag = False
    +
    +           summary = plpy.execute("SELECT * FROM 
{0}_summary".format(sssp_table))
    +           vertex_id = summary[0]['vertex_id']
    +           if vertex_id == "NULL":
    +                   vertex_id = "id"
    +
    +           grouping_cols = summary[0]['grouping_cols']
    +           if grouping_cols != "NULL":
    +                   glist = split_quoted_delimited_str(grouping_cols)
    +                   select_grps = _grp_from_table(sssp_table,glist) + " , "
    +
    +           plpy.execute("""
    +                   CREATE TABLE {path_table} AS
    +                           SELECT {select_grps} ''::text as path
    +                           FROM {sssp_table}
    +                           LIMIT 0
    +                   """.format(**locals()))
    +
    +           plan = """ PREPARE {plan_name} (int) AS
    +                   SELECT parent FROM {sssp_table}
    +                   WHERE {vertex_id} = $1 AND parent IS NOT NULL {gsql} 
LIMIT 1
    +                   """
    +
    +           if grouping_cols == "NULL":
    +                   plpy.execute(plan.format(**locals()))
    +                   path_flag = 
_path_helper(plan_name,dest_vertex,path_table,"")
     
    -   if parent.nrows() == 0:
    -           plpy.error(
    -                   "Graph SSSP: Vertex {0} is not present in the sssp 
table {1}".
    -                   format(dest_vertex,sssp_table))
    -
    -   while 1:
    -           parent = parent[0]['parent']
    -           if parent == cur:
    -                   ret.reverse()
    -                   return ret
                else:
    -                   ret.append(parent)
    -                   cur = parent
    -           parent = plpy.execute(sql.format(**locals()))
    +                   gvalues = plpy.execute(
    +                           """ SELECT {grouping_cols} FROM {sssp_table}
    +                                   GROUP BY 
{grouping_cols}""".format(**locals()))
    +
    +                   for i in gvalues:
    +                           cur = dest_vertex
    +                           gcheck = []
    +                           gset = []
    +                           for j in glist:
    +                                   gcheck.append("{0} = 
{1}".format(j,i[j]))
    +                                   gset.append("{0}".format(i[j]))
    +                           gsql = " AND " + " AND ".join(gcheck)
    --- End diff --
    
    Try using list comprehension here as well, it is supposed
    to be faster than append.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to