[ https://issues.apache.org/jira/browse/MADLIB-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965176#comment-15965176 ]
ASF GitHub Bot commented on MADLIB-1082: ---------------------------------------- Github user orhankislal commented on a diff in the pull request: https://github.com/apache/incubator-madlib/pull/112#discussion_r111041205 --- Diff: src/ports/postgres/modules/graph/pagerank.py_in --- @@ -158,44 +313,198 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, # https://en.wikipedia.org/wiki/PageRank#Damping_factor # The query below computes the PageRank of each node using the above formula. + # A small explanatory note on ignore_group_clause: + # This is used only when grouping is set. This essentially will have + # the condition that will help skip the PageRank computation on groups + # that have converged. plpy.execute(""" CREATE TABLE {message} AS - SELECT {edge_temp_table}.{dest} AS {vertex_id}, - SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_prob} AS pagerank + SELECT {grouping_cols_select} {edge_temp_table}.{dest} AS {vertex_id}, + SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank FROM {edge_temp_table} - INNER JOIN {cur} ON {edge_temp_table}.{dest}={cur}.{vertex_id} - INNER JOIN {out_cnts} ON {out_cnts}.{vertex_id}={edge_temp_table}.{src} - INNER JOIN {cur} AS {v1} ON {v1}.{vertex_id}={edge_temp_table}.{src} - GROUP BY {edge_temp_table}.{dest} - """.format(**locals())) + INNER JOIN {cur} ON {cur_join_clause} + INNER JOIN {out_cnts} ON {out_cnts_join_clause} + INNER JOIN {cur} AS {v1} ON {v1_join_clause} + {vertices_per_group_inner_join} + {ignore_group_clause} + GROUP BY {grouping_cols_select} {edge_temp_table}.{dest} + """.format(grouping_cols_select=edge_grouping_cols_select+', ' + if grouping_cols else '', + random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals()) + if grouping_cols else random_probability, + vertices_per_group_inner_join="""INNER JOIN {vertices_per_group} + AS {vpg} ON {vpg_join_clause}""".format(**locals()) + if grouping_cols else '', + ignore_group_clause=' WHERE '+get_ignore_groups( + summary_table, edge_temp_table, grouping_cols_list) + if iteration_num>0 and grouping_cols else '', + **locals())) # If there are nodes that have no incoming edges, they are not captured in the message table. # Insert entries for such nodes, with random_prob. plpy.execute(""" INSERT INTO {message} - SELECT {vertex_id}, {random_prob}::DOUBLE PRECISION AS pagerank - FROM {cur} - WHERE {vertex_id} NOT IN ( + SELECT {grouping_cols_select} {cur}.{vertex_id}, {random_jump_prob} AS pagerank + FROM {cur} {vpg_from_clause} + WHERE {vpg_where_clause} {vertex_id} NOT IN ( SELECT {vertex_id} FROM {message} + {message_grp_where} ) - """.format(**locals())) - # Check for convergence will be done as part of grouping support for pagerank: - # https://issues.apache.org/jira/browse/MADLIB-1082. So, the threshold parameter - # is a dummy variable at the moment, the PageRank computation happens for - # {max_iter} number of times. + {ignore_group_clause} + GROUP BY {grouping_cols_select} {cur}.{vertex_id} + """.format(grouping_cols_select=cur_grouping_cols_select+',' + if grouping_cols else '', + vpg_from_clause=', {vertices_per_group} AS {vpg}'.format(**locals()) + if grouping_cols else '', + vpg_where_clause='{vpg_cur_join_clause} AND '.format(**locals()) + if grouping_cols else '', + message_grp_where='WHERE {message_grp}'.format(**locals()) + if grouping_cols else '', + random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals()) + if grouping_cols else random_probability, + ignore_group_clause=' AND '+get_ignore_groups( + summary_table, cur, grouping_cols_list) + if iteration_num>0 and grouping_cols else '', + **locals())) + + # Check for convergence: + ## Check for convergence only if threshold != 0. + if threshold != 0: + # message_unconv and cur_unconv will contain the unconverged groups + # after current # and previous iterations respectively. Groups that + # are missing in message_unconv but appear in cur_unconv are the + # groups that have converged after this iteration's computations. + # If no grouping columns are specified, then we check if there is + # at least one unconverged node (limit 1 is used in the query). + plpy.execute(""" + CREATE TEMP TABLE {message_unconv} AS + SELECT {grouping_cols_select} + FROM {message} + INNER JOIN {cur} + ON {cur}.{vertex_id}={message}.{vertex_id} + WHERE {message_grp_clause} + ABS({cur}.pagerank-{message}.pagerank) > {threshold} + {ignore_group_clause} + {group_by_grouping_cols} + {limit} + """.format(grouping_cols_select=cur_grouping_cols_select + if grouping_cols else '{0}.{1}'.format(cur, vertex_id), + group_by_grouping_cols=' GROUP BY {0}'.format(cur_grouping_cols_select) + if grouping_cols else '', + message_grp_clause='{0} AND '.format(message_grp) + if grouping_cols else '', + ignore_group_clause=' AND '+get_ignore_groups(summary_table, cur, + grouping_cols_list) if iteration_num>0 and grouping_cols else '', + limit='' if grouping_cols else ' LIMIT 1 ', + **locals())) + unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0} + """.format(message_unconv))[0]["cnt"] + if iteration_num > 0 and grouping_cols: + # Update result and summary tables for groups that have converged + # since the last iteration. + update_result_tables(temp_summary_table, iteration_num, + summary_table, out_table, message, grouping_cols_list, + cur_unconv, message_unconv) + plpy.execute(""" + DROP TABLE IF EXISTS {cur_unconv}; + ALTER TABLE {message_unconv} RENAME TO {cur_unconv} + """.format(**locals())) + else: + # Do not run convergence test if threshold=0, since that implies + # the user wants to run max_iter iterations. + unconverged = 1 plpy.execute(""" - DROP TABLE IF EXISTS {cur}; - ALTER TABLE {message} RENAME TO {cur} - """.format(**locals())) + DROP TABLE IF EXISTS {cur}; + ALTER TABLE {message} RENAME TO {cur} + """.format(**locals())) + if unconverged == 0: + break - plpy.execute("ALTER TABLE {cur} RENAME TO {out_table}".format(**locals())) + # If there still are some unconverged groups/(entire table), update results. + if grouping_cols: + if unconverged > 0: + if threshold != 0: + # We completed max_iters, but there are still some unconverged groups + # Update the result and summary tables for unconverged groups. + update_result_tables(temp_summary_table, iteration_num, + summary_table, out_table, cur, grouping_cols_list, cur_unconv) + else: + # No group has converged. List of all group values are in + # distinct_grp_table. + update_result_tables(temp_summary_table, iteration_num, + summary_table, out_table, cur, grouping_cols_list, distinct_grp_table) + else: + plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table} + """.format(table_name=cur, **locals())) + plpy.execute(""" + INSERT INTO {summary_table} VALUES + ({iteration_num}+1); + """.format(**locals())) ## Step 4: Cleanup - plpy.execute(""" - DROP TABLE IF EXISTS {0},{1},{2},{3}; - """.format(out_cnts, edge_temp_table, cur, message)) + plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5}; + """.format(out_cnts, edge_temp_table, cur, message, cur_unconv, + message_unconv)) + if grouping_cols: + plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}; + """.format(vertices_per_group, temp_summary_table, + distinct_grp_table)) plpy.execute("SET client_min_messages TO %s" % old_msg_level) +def update_result_tables(temp_summary_table, i, summary_table, out_table, + res_table, grouping_cols_list, cur_unconv, message_unconv=None): + """ + This function updates the summary and output tables only for thouse + groups that have converged. This is found out by looking at groups + that appear in cur_unvonv but not in message_unconv. + If this function is called after max_iter is completed, without + convergence, all the unconverged groups from cur_unconv is used + (note that message_unconv is renamed to cur_unconv before checking + for unconverged==0 in the pagerank function's for loop) + """ + if message_unconv is None: + plpy.execute(""" + DROP TABLE IF EXISTS {temp_summary_table}; + CREATE TABLE {temp_summary_table} AS --- End diff -- We can overwrite the temp_summary_table variable instead of creating a new table (as long as we make sure not to drop it at the end). > Graph - add grouping to page rank > --------------------------------- > > Key: MADLIB-1082 > URL: https://issues.apache.org/jira/browse/MADLIB-1082 > Project: Apache MADlib > Issue Type: Improvement > Components: Module: Graph > Reporter: Frank McQuillan > Assignee: Nandish Jayaram > Priority: Minor > Fix For: v1.11 > > > Add grouping column to edge table to support separate page rank calculations > by group -- This message was sent by Atlassian JIRA (v6.3.15#6346)