Github user orhankislal commented on a diff in the pull request:

    https://github.com/apache/incubator-madlib/pull/112#discussion_r111033110
  
    --- Diff: src/ports/postgres/modules/graph/pagerank.py_in ---
    @@ -158,44 +313,198 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             # https://en.wikipedia.org/wiki/PageRank#Damping_factor
     
             # The query below computes the PageRank of each node using the 
above formula.
    +        # A small explanatory note on ignore_group_clause:
    +        # This is used only when grouping is set. This essentially will 
have
    +        # the condition that will help skip the PageRank computation on 
groups
    +        # that have converged.
             plpy.execute("""
                     CREATE TABLE {message} AS
    -                SELECT {edge_temp_table}.{dest} AS {vertex_id},
    -                        
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_prob} AS 
pagerank
    +                SELECT {grouping_cols_select} {edge_temp_table}.{dest} AS 
{vertex_id},
    +                        
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
 AS pagerank
                     FROM {edge_temp_table}
    -                    INNER JOIN {cur} ON 
{edge_temp_table}.{dest}={cur}.{vertex_id}
    -                    INNER JOIN {out_cnts} ON 
{out_cnts}.{vertex_id}={edge_temp_table}.{src}
    -                    INNER JOIN {cur} AS {v1} ON 
{v1}.{vertex_id}={edge_temp_table}.{src}
    -                GROUP BY {edge_temp_table}.{dest}
    -            """.format(**locals()))
    +                    INNER JOIN {cur} ON {cur_join_clause}
    +                    INNER JOIN {out_cnts} ON {out_cnts_join_clause}
    +                    INNER JOIN {cur} AS {v1} ON {v1_join_clause}
    +                    {vertices_per_group_inner_join}
    +                {ignore_group_clause}
    +                GROUP BY {grouping_cols_select} {edge_temp_table}.{dest}
    +            """.format(grouping_cols_select=edge_grouping_cols_select+', '
    +                    if grouping_cols else '',
    +                
random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
    +                    if grouping_cols else random_probability,
    +                vertices_per_group_inner_join="""INNER JOIN 
{vertices_per_group}
    +                    AS {vpg} ON {vpg_join_clause}""".format(**locals())
    +                    if grouping_cols else '',
    +                ignore_group_clause=' WHERE '+get_ignore_groups(
    +                    summary_table, edge_temp_table, grouping_cols_list)
    +                    if iteration_num>0 and grouping_cols else '',
    +                **locals()))
             # If there are nodes that have no incoming edges, they are not 
captured in the message table.
             # Insert entries for such nodes, with random_prob.
             plpy.execute("""
                     INSERT INTO {message}
    -                SELECT {vertex_id}, {random_prob}::DOUBLE PRECISION AS 
pagerank
    -                FROM {cur}
    -                WHERE {vertex_id} NOT IN (
    +                SELECT {grouping_cols_select} {cur}.{vertex_id}, 
{random_jump_prob} AS pagerank
    +                FROM {cur} {vpg_from_clause}
    +                WHERE {vpg_where_clause} {vertex_id} NOT IN (
                         SELECT {vertex_id}
                         FROM {message}
    +                    {message_grp_where}
                     )
    -            """.format(**locals()))
    -        # Check for convergence will be done as part of grouping support 
for pagerank:
    -        # https://issues.apache.org/jira/browse/MADLIB-1082. So, the 
threshold parameter
    -        # is a dummy variable at the moment, the PageRank computation 
happens for
    -        # {max_iter} number of times.
    +                {ignore_group_clause}
    +                GROUP BY {grouping_cols_select} {cur}.{vertex_id}
    +            """.format(grouping_cols_select=cur_grouping_cols_select+','
    +                    if grouping_cols else '',
    +                vpg_from_clause=', {vertices_per_group} AS 
{vpg}'.format(**locals())
    +                    if grouping_cols else '',
    +                vpg_where_clause='{vpg_cur_join_clause} AND 
'.format(**locals())
    +                    if grouping_cols else '',
    +                message_grp_where='WHERE {message_grp}'.format(**locals())
    +                    if grouping_cols else '',
    +                
random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
    +                    if grouping_cols else random_probability,
    +                ignore_group_clause=' AND '+get_ignore_groups(
    +                    summary_table, cur, grouping_cols_list)
    +                    if iteration_num>0 and grouping_cols else '',
    +                **locals()))
    +
    +        # Check for convergence:
    +        ## Check for convergence only if threshold != 0.
    +        if threshold != 0:
    +            # message_unconv and cur_unconv will contain the unconverged 
groups
    +            # after current # and previous iterations respectively. Groups 
that
    +            # are missing in message_unconv but appear in cur_unconv are 
the
    +            # groups that have converged after this iteration's 
computations.
    +            # If no grouping columns are specified, then we check if there 
is
    +            # at least one unconverged node (limit 1 is used in the query).
    +            plpy.execute("""
    +                    CREATE TEMP TABLE {message_unconv} AS
    +                    SELECT {grouping_cols_select}
    +                    FROM {message}
    +                    INNER JOIN {cur}
    +                    ON {cur}.{vertex_id}={message}.{vertex_id}
    +                    WHERE {message_grp_clause}
    +                        ABS({cur}.pagerank-{message}.pagerank) > 
{threshold}
    +                    {ignore_group_clause}
    +                    {group_by_grouping_cols}
    +                    {limit}
    +                """.format(grouping_cols_select=cur_grouping_cols_select
    +                        if grouping_cols else '{0}.{1}'.format(cur, 
vertex_id),
    +                    group_by_grouping_cols=' GROUP BY 
{0}'.format(cur_grouping_cols_select)
    +                        if grouping_cols else '',
    +                    message_grp_clause='{0} AND '.format(message_grp)
    +                        if grouping_cols else '',
    +                    ignore_group_clause=' AND 
'+get_ignore_groups(summary_table, cur,
    +                        grouping_cols_list) if iteration_num>0 and 
grouping_cols else '',
    +                    limit='' if grouping_cols else ' LIMIT 1 ',
    +                    **locals()))
    +            unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
    +                """.format(message_unconv))[0]["cnt"]
    +            if iteration_num > 0 and grouping_cols:
    +                # Update result and summary tables for groups that have 
converged
    +                # since the last iteration.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, message, grouping_cols_list,
    +                    cur_unconv, message_unconv)
    +            plpy.execute("""
    +                DROP TABLE IF EXISTS {cur_unconv};
    +                ALTER TABLE {message_unconv} RENAME TO {cur_unconv}
    +                """.format(**locals()))
    +        else:
    +            # Do not run convergence test if threshold=0, since that 
implies
    +            # the user wants to run max_iter iterations.
    +            unconverged = 1
             plpy.execute("""
    -                DROP TABLE IF EXISTS {cur};
    -                ALTER TABLE {message} RENAME TO {cur}
    -            """.format(**locals()))
    +                    DROP TABLE IF EXISTS {cur};
    +                    ALTER TABLE {message} RENAME TO {cur}
    +                """.format(**locals()))
    +        if unconverged == 0:
    +            break
     
    -    plpy.execute("ALTER TABLE {cur} RENAME TO 
{out_table}".format(**locals()))
    +    # If there still are some unconverged groups/(entire table), update 
results.
    +    if grouping_cols:
    +        if unconverged > 0:
    +            if threshold != 0:
    +                # We completed max_iters, but there are still some 
unconverged groups
    +                # Update the result and summary tables for unconverged 
groups.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, cur, grouping_cols_list, 
cur_unconv)
    +            else:
    +                # No group has converged. List of all group values are in
    +                # distinct_grp_table.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, cur, grouping_cols_list, 
distinct_grp_table)
    +    else:
    +        plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
    +            """.format(table_name=cur, **locals()))
    +        plpy.execute("""
    +                INSERT INTO {summary_table} VALUES
    +                ({iteration_num}+1);
    +            """.format(**locals()))
     
         ## Step 4: Cleanup
    -    plpy.execute("""
    -        DROP TABLE IF EXISTS {0},{1},{2},{3};
    -        """.format(out_cnts, edge_temp_table, cur, message))
    +    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5};
    +        """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
    +                    message_unconv))
    +    if grouping_cols:
    +        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
    +            """.format(vertices_per_group, temp_summary_table,
    +                distinct_grp_table))
         plpy.execute("SET client_min_messages TO %s" % old_msg_level)
     
    +def update_result_tables(temp_summary_table, i, summary_table, out_table,
    +    res_table, grouping_cols_list, cur_unconv, message_unconv=None):
    +    """
    +        This function updates the summary and output tables only for thouse
    --- End diff --
    
    thouse -> those
    
    I think this explanation is somewhat unclear as an introduction of the 
function. You might want to move the If check explanation to the code itself. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to