[ 
https://issues.apache.org/jira/browse/MADLIB-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965176#comment-15965176
 ] 

ASF GitHub Bot commented on MADLIB-1082:
----------------------------------------

Github user orhankislal commented on a diff in the pull request:

    https://github.com/apache/incubator-madlib/pull/112#discussion_r111041205
  
    --- Diff: src/ports/postgres/modules/graph/pagerank.py_in ---
    @@ -158,44 +313,198 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             # https://en.wikipedia.org/wiki/PageRank#Damping_factor
     
             # The query below computes the PageRank of each node using the 
above formula.
    +        # A small explanatory note on ignore_group_clause:
    +        # This is used only when grouping is set. This essentially will 
have
    +        # the condition that will help skip the PageRank computation on 
groups
    +        # that have converged.
             plpy.execute("""
                     CREATE TABLE {message} AS
    -                SELECT {edge_temp_table}.{dest} AS {vertex_id},
    -                        
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_prob} AS 
pagerank
    +                SELECT {grouping_cols_select} {edge_temp_table}.{dest} AS 
{vertex_id},
    +                        
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
 AS pagerank
                     FROM {edge_temp_table}
    -                    INNER JOIN {cur} ON 
{edge_temp_table}.{dest}={cur}.{vertex_id}
    -                    INNER JOIN {out_cnts} ON 
{out_cnts}.{vertex_id}={edge_temp_table}.{src}
    -                    INNER JOIN {cur} AS {v1} ON 
{v1}.{vertex_id}={edge_temp_table}.{src}
    -                GROUP BY {edge_temp_table}.{dest}
    -            """.format(**locals()))
    +                    INNER JOIN {cur} ON {cur_join_clause}
    +                    INNER JOIN {out_cnts} ON {out_cnts_join_clause}
    +                    INNER JOIN {cur} AS {v1} ON {v1_join_clause}
    +                    {vertices_per_group_inner_join}
    +                {ignore_group_clause}
    +                GROUP BY {grouping_cols_select} {edge_temp_table}.{dest}
    +            """.format(grouping_cols_select=edge_grouping_cols_select+', '
    +                    if grouping_cols else '',
    +                
random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
    +                    if grouping_cols else random_probability,
    +                vertices_per_group_inner_join="""INNER JOIN 
{vertices_per_group}
    +                    AS {vpg} ON {vpg_join_clause}""".format(**locals())
    +                    if grouping_cols else '',
    +                ignore_group_clause=' WHERE '+get_ignore_groups(
    +                    summary_table, edge_temp_table, grouping_cols_list)
    +                    if iteration_num>0 and grouping_cols else '',
    +                **locals()))
             # If there are nodes that have no incoming edges, they are not 
captured in the message table.
             # Insert entries for such nodes, with random_prob.
             plpy.execute("""
                     INSERT INTO {message}
    -                SELECT {vertex_id}, {random_prob}::DOUBLE PRECISION AS 
pagerank
    -                FROM {cur}
    -                WHERE {vertex_id} NOT IN (
    +                SELECT {grouping_cols_select} {cur}.{vertex_id}, 
{random_jump_prob} AS pagerank
    +                FROM {cur} {vpg_from_clause}
    +                WHERE {vpg_where_clause} {vertex_id} NOT IN (
                         SELECT {vertex_id}
                         FROM {message}
    +                    {message_grp_where}
                     )
    -            """.format(**locals()))
    -        # Check for convergence will be done as part of grouping support 
for pagerank:
    -        # https://issues.apache.org/jira/browse/MADLIB-1082. So, the 
threshold parameter
    -        # is a dummy variable at the moment, the PageRank computation 
happens for
    -        # {max_iter} number of times.
    +                {ignore_group_clause}
    +                GROUP BY {grouping_cols_select} {cur}.{vertex_id}
    +            """.format(grouping_cols_select=cur_grouping_cols_select+','
    +                    if grouping_cols else '',
    +                vpg_from_clause=', {vertices_per_group} AS 
{vpg}'.format(**locals())
    +                    if grouping_cols else '',
    +                vpg_where_clause='{vpg_cur_join_clause} AND 
'.format(**locals())
    +                    if grouping_cols else '',
    +                message_grp_where='WHERE {message_grp}'.format(**locals())
    +                    if grouping_cols else '',
    +                
random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
    +                    if grouping_cols else random_probability,
    +                ignore_group_clause=' AND '+get_ignore_groups(
    +                    summary_table, cur, grouping_cols_list)
    +                    if iteration_num>0 and grouping_cols else '',
    +                **locals()))
    +
    +        # Check for convergence:
    +        ## Check for convergence only if threshold != 0.
    +        if threshold != 0:
    +            # message_unconv and cur_unconv will contain the unconverged 
groups
    +            # after current # and previous iterations respectively. Groups 
that
    +            # are missing in message_unconv but appear in cur_unconv are 
the
    +            # groups that have converged after this iteration's 
computations.
    +            # If no grouping columns are specified, then we check if there 
is
    +            # at least one unconverged node (limit 1 is used in the query).
    +            plpy.execute("""
    +                    CREATE TEMP TABLE {message_unconv} AS
    +                    SELECT {grouping_cols_select}
    +                    FROM {message}
    +                    INNER JOIN {cur}
    +                    ON {cur}.{vertex_id}={message}.{vertex_id}
    +                    WHERE {message_grp_clause}
    +                        ABS({cur}.pagerank-{message}.pagerank) > 
{threshold}
    +                    {ignore_group_clause}
    +                    {group_by_grouping_cols}
    +                    {limit}
    +                """.format(grouping_cols_select=cur_grouping_cols_select
    +                        if grouping_cols else '{0}.{1}'.format(cur, 
vertex_id),
    +                    group_by_grouping_cols=' GROUP BY 
{0}'.format(cur_grouping_cols_select)
    +                        if grouping_cols else '',
    +                    message_grp_clause='{0} AND '.format(message_grp)
    +                        if grouping_cols else '',
    +                    ignore_group_clause=' AND 
'+get_ignore_groups(summary_table, cur,
    +                        grouping_cols_list) if iteration_num>0 and 
grouping_cols else '',
    +                    limit='' if grouping_cols else ' LIMIT 1 ',
    +                    **locals()))
    +            unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
    +                """.format(message_unconv))[0]["cnt"]
    +            if iteration_num > 0 and grouping_cols:
    +                # Update result and summary tables for groups that have 
converged
    +                # since the last iteration.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, message, grouping_cols_list,
    +                    cur_unconv, message_unconv)
    +            plpy.execute("""
    +                DROP TABLE IF EXISTS {cur_unconv};
    +                ALTER TABLE {message_unconv} RENAME TO {cur_unconv}
    +                """.format(**locals()))
    +        else:
    +            # Do not run convergence test if threshold=0, since that 
implies
    +            # the user wants to run max_iter iterations.
    +            unconverged = 1
             plpy.execute("""
    -                DROP TABLE IF EXISTS {cur};
    -                ALTER TABLE {message} RENAME TO {cur}
    -            """.format(**locals()))
    +                    DROP TABLE IF EXISTS {cur};
    +                    ALTER TABLE {message} RENAME TO {cur}
    +                """.format(**locals()))
    +        if unconverged == 0:
    +            break
     
    -    plpy.execute("ALTER TABLE {cur} RENAME TO 
{out_table}".format(**locals()))
    +    # If there still are some unconverged groups/(entire table), update 
results.
    +    if grouping_cols:
    +        if unconverged > 0:
    +            if threshold != 0:
    +                # We completed max_iters, but there are still some 
unconverged groups
    +                # Update the result and summary tables for unconverged 
groups.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, cur, grouping_cols_list, 
cur_unconv)
    +            else:
    +                # No group has converged. List of all group values are in
    +                # distinct_grp_table.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, cur, grouping_cols_list, 
distinct_grp_table)
    +    else:
    +        plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
    +            """.format(table_name=cur, **locals()))
    +        plpy.execute("""
    +                INSERT INTO {summary_table} VALUES
    +                ({iteration_num}+1);
    +            """.format(**locals()))
     
         ## Step 4: Cleanup
    -    plpy.execute("""
    -        DROP TABLE IF EXISTS {0},{1},{2},{3};
    -        """.format(out_cnts, edge_temp_table, cur, message))
    +    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5};
    +        """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
    +                    message_unconv))
    +    if grouping_cols:
    +        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
    +            """.format(vertices_per_group, temp_summary_table,
    +                distinct_grp_table))
         plpy.execute("SET client_min_messages TO %s" % old_msg_level)
     
    +def update_result_tables(temp_summary_table, i, summary_table, out_table,
    +    res_table, grouping_cols_list, cur_unconv, message_unconv=None):
    +    """
    +        This function updates the summary and output tables only for thouse
    +        groups that have converged. This is found out by looking at groups
    +        that appear in cur_unvonv but not in message_unconv.
    +        If this function is called after max_iter is completed, without
    +        convergence, all the unconverged groups from cur_unconv is used
    +        (note that message_unconv is renamed to cur_unconv before checking
    +        for unconverged==0 in the pagerank function's for loop)
    +    """
    +    if message_unconv is None:
    +        plpy.execute("""
    +            DROP TABLE IF EXISTS {temp_summary_table};
    +            CREATE TABLE {temp_summary_table} AS
    --- End diff --
    
    We can overwrite the temp_summary_table variable instead of creating a new 
table (as long as we make sure not to drop it at the end).


> Graph - add grouping to page rank
> ---------------------------------
>
>                 Key: MADLIB-1082
>                 URL: https://issues.apache.org/jira/browse/MADLIB-1082
>             Project: Apache MADlib
>          Issue Type: Improvement
>          Components: Module: Graph
>            Reporter: Frank McQuillan
>            Assignee: Nandish Jayaram
>            Priority: Minor
>             Fix For: v1.11
>
>
> Add grouping column to edge table to support separate page rank calculations 
> by group



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to