http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/pagerank.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/pagerank.py_in
b/src/ports/postgres/modules/graph/pagerank.py_in
index 4ef5876..95bc4b4 100644
--- a/src/ports/postgres/modules/graph/pagerank.py_in
+++ b/src/ports/postgres/modules/graph/pagerank.py_in
@@ -32,36 +32,38 @@ from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
+from utilities.utilities import is_platform_pg
+
from utilities.validate_args import columns_exist_in_table, get_cols_and_types
from graph_utils import *
-m4_changequote(`<!', `!>')
def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, damping_factor, max_iter, threshold,
- grouping_cols_list, module_name):
+ edge_params, out_table, damping_factor, max_iter,
+ threshold, grouping_cols_list):
"""
Function to validate input parameters for PageRank
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table, module_name)
+ out_table, 'PageRank')
_assert(damping_factor >= 0.0 and damping_factor <= 1.0,
- """PageRank: Invalid damping factor value ({0}), must be between 0 and
1.""".
- format(damping_factor))
+ "PageRank: Invalid damping factor value ({0}), must be between 0
and 1.".
+ format(damping_factor))
_assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
- """PageRank: Invalid threshold value ({0}), must be between 0 and
1.""".
- format(threshold))
+ "PageRank: Invalid threshold value ({0}), must be between 0 and
1.".
+ format(threshold))
_assert(max_iter > 0,
- """PageRank: Invalid max_iter value ({0}), must be a positive
integer.""".
- format(max_iter))
+ """PageRank: Invalid max_iter value ({0}), must be a positive
integer.""".
+ format(max_iter))
if grouping_cols_list:
# validate the grouping columns. We currently only support
grouping_cols
# to be column names in the edge_table, and not expressions!
_assert(columns_exist_in_table(edge_table, grouping_cols_list,
schema_madlib),
"PageRank error: One or more grouping columns specified do not
exist!")
+
def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
- out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
+ out_table, damping_factor, max_iter, threshold, grouping_cols,
**kwargs):
"""
Function that computes the PageRank
@@ -76,458 +78,451 @@ def pagerank(schema_madlib, vertex_table, vertex_id,
edge_table, edge_args,
@param max_iter
@param threshold
"""
- old_msg_level = plpy.execute("""
- SELECT setting
- FROM pg_settings
- WHERE name='client_min_messages'
- """)[0]['setting']
- plpy.execute('SET client_min_messages TO warning')
- params_types = {'src': str, 'dest': str}
- default_args = {'src': 'src', 'dest': 'dest'}
- edge_params = extract_keyvalue_params(edge_args, params_types,
default_args)
-
- # populate default values for optional params if null
- if damping_factor is None:
- damping_factor = 0.85
- if max_iter is None:
- max_iter = 100
- if vertex_id is None:
- vertex_id = "id"
- if not grouping_cols:
- grouping_cols = ''
-
- grouping_cols_list = split_quoted_delimited_str(grouping_cols)
- validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, damping_factor, max_iter, threshold,
- grouping_cols_list, 'PageRank')
- summary_table = out_table + "_summary"
- _assert(not table_exists(summary_table),
- "Graph PageRank: Output summary table ({summary_table}) already
exists."
- .format(**locals()))
- src = edge_params["src"]
- dest = edge_params["dest"]
- nvertices = plpy.execute("""
- SELECT COUNT({0}) AS cnt
- FROM {1}
- """.format(vertex_id, vertex_table))[0]["cnt"]
- # A fixed threshold value, of say 1e-5, might not work well when the
- # number of vertices is a billion, since the initial pagerank value
- # of all nodes would then be 1/1e-9. So, assign default threshold
- # value based on number of nodes in the graph.
- # NOTE: The heuristic below is not based on any scientific evidence.
- if threshold is None:
- threshold = 1.0/(nvertices*1000)
-
- # table/column names used when grouping_cols is set.
- distinct_grp_table = ''
- vertices_per_group = ''
- vpg = ''
- grouping_where_clause = ''
- group_by_clause = ''
- random_prob = ''
-
- edge_temp_table = unique_string(desp='temp_edge')
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
- if grouping_cols else '', dest)!>)
- plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
- plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS
- SELECT * FROM {edge_table}
- {distribution}
- """.format(**locals()))
- # GPDB and HAWQ have distributed by clauses to help them with indexing.
- # For Postgres we add the index explicitly.
- sql_index = m4_ifdef(<!__POSTGRESQL__!>,
- <!"""CREATE INDEX ON {edge_temp_table} ({src});
- """.format(**locals())!>,
- <!''!>)
- plpy.execute(sql_index)
-
- # Intermediate tables required.
- cur = unique_string(desp='cur')
- message = unique_string(desp='message')
- cur_unconv = unique_string(desp='cur_unconv')
- message_unconv = unique_string(desp='message_unconv')
- out_cnts = unique_string(desp='out_cnts')
- out_cnts_cnt = unique_string(desp='cnt')
- v1 = unique_string(desp='v1')
-
- cur_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
- if grouping_cols else '', vertex_id)!>)
- cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
- if grouping_cols else '', vertex_id)!>)
- cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id}
- """.format(**locals())
- out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src}
- """.format(**locals())
- v1_join_clause = """{v1}.{vertex_id}={edge_temp_table}.{src}
- """.format(**locals())
-
- random_probability = (1.0-damping_factor)/nvertices
- ######################################################################
- # Create several strings that will be used to construct required
- # queries. These strings will be required only during grouping.
- random_jump_prob = random_probability
- ignore_group_clause_first = ''
- limit = ' LIMIT 1 '
-
- grouping_cols_select_pr = ''
- vertices_per_group_inner_join_pr = ''
- ignore_group_clause_pr= ''
-
- grouping_cols_select_ins = ''
- vpg_from_clause_ins = ''
- vpg_where_clause_ins = ''
- message_grp_where_ins = ''
- ignore_group_clause_ins = ''
-
- nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
- ignore_group_clause_ins_noincoming = ''
-
- grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
- group_by_grouping_cols_conv = ''
- message_grp_clause_conv = ''
- ignore_group_clause_conv = ''
- ######################################################################
-
- # Queries when groups are involved need a lot more conditions in
- # various clauses, so populating the required variables. Some intermediate
- # tables are unnecessary when no grouping is involved, so create some
- # tables and certain columns only during grouping.
- if grouping_cols:
- distinct_grp_table = unique_string(desp='grp')
- plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
- plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
- SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
- """.format(**locals()))
- vertices_per_group = unique_string(desp='nvert_grp')
- init_pr = unique_string(desp='init')
- random_prob = unique_string(desp='rand')
- subq = unique_string(desp='subquery')
- rand_damp = 1-damping_factor
- grouping_where_clause = ' AND '.join(
- [distinct_grp_table+'.'+col+'='+subq+'.'+col
- for col in grouping_cols_list])
- group_by_clause = ', '.join([distinct_grp_table+'.'+col
- for col in grouping_cols_list])
- # Find number of vertices in each group, this is the normalizing factor
- # for computing the random_prob
- plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
- plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
- SELECT {distinct_grp_table}.*,
- 1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
- {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS
{random_prob}
- FROM {distinct_grp_table} INNER JOIN (
- SELECT {grouping_cols}, {src} AS __vertices__
- FROM {edge_temp_table}
- UNION
- SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
- ){subq}
- ON {grouping_where_clause}
- GROUP BY {group_by_clause}
- """.format(**locals()))
-
- grouping_where_clause = ' AND '.join(
- [vertices_per_group+'.'+col+'='+subq+'.'+col
- for col in grouping_cols_list])
- group_by_clause = ', '.join([vertices_per_group+'.'+col
- for col in grouping_cols_list])
+ with MinWarning('warning'):
+ params_types = {'src': str, 'dest': str}
+ default_args = {'src': 'src', 'dest': 'dest'}
+ edge_params = extract_keyvalue_params(edge_args, params_types,
default_args)
+
+ # populate default values for optional params if null
+ if damping_factor is None:
+ damping_factor = 0.85
+ if max_iter is None:
+ max_iter = 100
+ if vertex_id is None:
+ vertex_id = "id"
+ if not grouping_cols:
+ grouping_cols = ''
+
+ grouping_cols_list = split_quoted_delimited_str(grouping_cols)
+ validate_pagerank_args(schema_madlib, vertex_table, vertex_id,
edge_table,
+ edge_params, out_table, damping_factor,
+ max_iter, threshold, grouping_cols_list)
+ summary_table = out_table + "_summary"
+ _assert(not table_exists(summary_table),
+ "Graph PageRank: Output summary table ({summary_table})
already exists."
+ .format(**locals()))
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ n_vertices = plpy.execute("""
+ SELECT COUNT({0}) AS cnt
+ FROM {1}
+ """.format(vertex_id, vertex_table))[0]["cnt"]
+ # A fixed threshold value, of say 1e-5, might not work well when the
+ # number of vertices is a billion, since the initial pagerank value
+ # of all nodes would then be 1/1e-9. So, assign default threshold
+ # value based on number of nodes in the graph.
+ # NOTE: The heuristic below is not based on any scientific evidence.
+ if threshold is None:
+ threshold = 1.0 / (n_vertices * 1000)
+
+ # table/column names used when grouping_cols is set.
+ distinct_grp_table = ''
+ vertices_per_group = ''
+ vpg = ''
+ grouping_where_clause = ''
+ group_by_clause = ''
+ random_prob = ''
+
+ edge_temp_table = unique_string(desp='temp_edge')
+ grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
+ distribution = ('' if is_platform_pg() else
+ "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma,
dest))
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
plpy.execute("""
- CREATE TEMP TABLE {cur} AS
- SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
- {init_pr} AS pagerank
- FROM {vertices_per_group} INNER JOIN (
- SELECT {grouping_cols}, {src} AS __vertices__
- FROM {edge_temp_table}
- UNION
- SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
- ){subq}
- ON {grouping_where_clause}
- {cur_distribution}
- """.format(**locals()))
- vpg = unique_string(desp='vpg')
- # Compute the out-degree of every node in the group-based subgraphs.
- plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
- plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
- SELECT {grouping_cols_select} {src} AS {vertex_id},
- COUNT({dest}) AS {out_cnts_cnt}
- FROM {edge_temp_table}
- GROUP BY {grouping_cols_select} {src}
- {cnts_distribution}
- """.format(grouping_cols_select=grouping_cols+','
- if grouping_cols else '', **locals()))
-
- message_grp = ' AND '.join(
- ["{cur}.{col}={message}.{col}".format(**locals())
- for col in grouping_cols_list])
- cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
- ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
- for col in grouping_cols_list])
- out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
- ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
- for col in grouping_cols_list])
- v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
- ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
- for col in grouping_cols_list])
- vpg_join_clause = ' AND '.join(
- ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
- for col in grouping_cols_list])
- vpg_t1_join_clause = ' AND '.join(
- ["__t1__.{col}={vpg}.{col}".format(**locals())
- for col in grouping_cols_list])
- # join clause specific to populating random_prob for nodes without any
- # incoming edges.
- edge_grouping_cols_select = ', '.join(
- ["{edge_temp_table}.{col}".format(**locals())
- for col in grouping_cols_list])
- cur_grouping_cols_select = ', '.join(
- ["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
- # Create output summary table:
- cols_names_types = get_cols_and_types(edge_table)
- grouping_cols_clause = ', '.join([c_name+" "+c_type
- for (c_name, c_type) in cols_names_types
- if c_name in grouping_cols_list])
- plpy.execute("""
- CREATE TABLE {summary_table} (
- {grouping_cols_clause},
- __iterations__ INTEGER
- )
- """.format(**locals()))
- # Create output table. This will be updated whenever a group converges
- # Note that vertex_id is assumed to be an integer (as described in
- # documentation)
- plpy.execute("""
- CREATE TABLE {out_table} (
- {grouping_cols_clause},
- {vertex_id} INTEGER,
- pagerank DOUBLE PRECISION
- )
- """.format(**locals()))
- temp_summary_table = unique_string(desp='temp_summary')
- plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
- plpy.execute("""
- CREATE TABLE {temp_summary_table} (
- {grouping_cols_clause}
- )
+ CREATE TEMP TABLE {edge_temp_table} AS
+ SELECT * FROM {edge_table}
+ {distribution}
""".format(**locals()))
+
+ # GPDB and HAWQ have distributed by clauses to help them with indexing.
+ # For Postgres we add the index explicitly.
+ if is_platform_pg():
+ plpy.execute("CREATE INDEX ON {0}({1})".format(edge_temp_table,
src))
+
+ # Intermediate tables required.
+ cur = unique_string(desp='cur')
+ message = unique_string(desp='message')
+ cur_unconv = unique_string(desp='cur_unconv')
+ message_unconv = unique_string(desp='message_unconv')
+ out_cnts = unique_string(desp='out_cnts')
+ out_cnts_cnt = unique_string(desp='cnt')
+ v1 = unique_string(desp='v1')
+
+ if is_platform_pg():
+ cur_distribution = cnts_distribution = ''
+ else:
+ cur_distribution = cnts_distribution = \
+ "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma,
vertex_id)
+ cur_join_clause = "{edge_temp_table}.{dest} =
{cur}.{vertex_id}".format(**locals())
+ out_cnts_join_clause = "{out_cnts}.{vertex_id} =
{edge_temp_table}.{src}".format(**locals())
+ v1_join_clause = "{v1}.{vertex_id} =
{edge_temp_table}.{src}".format(**locals())
+
+ random_probability = (1.0 - damping_factor) / n_vertices
+ ######################################################################
+ # Create several strings that will be used to construct required
+ # queries. These strings will be required only during grouping.
+ random_jump_prob = random_probability
+ ignore_group_clause_first = ''
+ limit = ' LIMIT 1 '
+
+ grouping_cols_select_pr = ''
+ vertices_per_group_inner_join_pr = ''
+ ignore_group_clause_pr = ''
+
+ grouping_cols_select_ins = ''
+ vpg_from_clause_ins = ''
+ vpg_where_clause_ins = ''
+ message_grp_where_ins = ''
+ ignore_group_clause_ins = ''
+
+ nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
+ ignore_group_clause_ins_noincoming = ''
+
+ grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
+ group_by_grouping_cols_conv = ''
+ message_grp_clause_conv = ''
+ ignore_group_clause_conv = ''
######################################################################
- # Strings required for the main PageRank computation query
- grouping_cols_select_pr = edge_grouping_cols_select+', '
- random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
- vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
- AS {vpg} ON {vpg_join_clause}""".format(**locals())
- ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table,
- edge_temp_table, grouping_cols_list)
- ignore_group_clause_ins_noincoming = ' WHERE '+get_ignore_groups(
- summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
- # Strings required for updating PageRank scores of vertices that have
- # no incoming edges
- grouping_cols_select_ins = cur_grouping_cols_select+','
- vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
- **locals())
- vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
- **locals())
- message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
- ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table,
- cur, grouping_cols_list)
- # Strings required for convergence test query
- grouping_cols_select_conv = cur_grouping_cols_select
- group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
- cur_grouping_cols_select)
- message_grp_clause_conv = '{0} AND '.format(message_grp)
- ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table,
- cur, grouping_cols_list)
- limit = ''
-
- # Find all nodes, in each group, that have no incoming edges. The
PageRank
- # value of these nodes are not updated using the first query in the
- # following for loop. They must be explicitly plugged back in to the
- # message table, with their corresponding group's random_prob as their
- # PageRank values.
- plpy.execute("""
- CREATE TABLE {nodes_with_no_incoming_edges} AS
- SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
- {vpg}.{random_prob} AS pagerank
- FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
- WHERE NOT EXISTS (
- SELECT 1
- FROM {edge_temp_table} AS __t2__
- WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
- ) {vpg_where_clause_ins}
- """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
- for col in grouping_cols_list]),
- where_group_clause=' AND
'.join(['__t1__.{0}=__t2__.{0}'.format(col)
- for col in grouping_cols_list]),
- **locals()))
- else:
- # cur and out_cnts tables can be simpler when no grouping is involved.
- init_value = 1.0/nvertices
- plpy.execute("""
- CREATE TEMP TABLE {cur} AS
- SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
- FROM {vertex_table}
- {cur_distribution}
- """.format(**locals()))
- # Compute the out-degree of every node in the graph.
- plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
- plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
- SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
- FROM {edge_temp_table}
- GROUP BY {src}
- {cnts_distribution}
- """.format(**locals()))
- # The summary table when there is no grouping will contain only
- # the iteration column. We don't need to create the out_table
- # when no grouping is used since the 'cur' table will be renamed
- # to out_table after pagerank computation is completed.
- plpy.execute("""
- CREATE TABLE {summary_table} (
- __iterations__ INTEGER
- )
- """.format(**locals()))
- # Find all nodes in the graph that don't have any incoming edges and
- # assign random_prob as their pagerank values.
- plpy.execute("""
- CREATE TABLE {nodes_with_no_incoming_edges} AS
- SELECT DISTINCT({src}), {random_probability} AS pagerank
- FROM {edge_temp_table}
- EXCEPT
- (SELECT DISTINCT({dest}), {random_probability} AS pagerank
- FROM {edge_temp_table})
- """.format(**locals()))
- unconverged = 0
- iteration_num = 0
- for iteration_num in range(max_iter):
- #####################################################################
- # PageRank for node 'A' at any given iteration 'i' is given by:
- # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
- # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
- # where 'N' is the number of vertices in the graph,
- # B, C are nodes that have edges to node A, and
- # degree(node) represents the number of outgoing edges from 'node'
- #####################################################################
- # Essentially, the pagerank for a node is based on an aggregate of a
- # fraction of the pagerank values of all the nodes that have incoming
- # edges to it, along with a small random probability.
- # More information can be found at:
- # https://en.wikipedia.org/wiki/PageRank#Damping_factor
-
- # The query below computes the PageRank of each node using the above
- # formula. A small explanatory note on ignore_group_clause:
- # This is used only when grouping is set. This essentially will have
- # the condition that will help skip the PageRank computation on groups
- # that have converged.
- plpy.execute("""
- CREATE TABLE {message} AS
- SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS
{vertex_id},
-
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
AS pagerank
+ # Queries when groups are involved need a lot more conditions in
+ # various clauses, so populating the required variables. Some
intermediate
+ # tables are unnecessary when no grouping is involved, so create some
+ # tables and certain columns only during grouping.
+ if grouping_cols:
+ distinct_grp_table = unique_string(desp='grp')
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+ plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
+ SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
+ """.format(**locals()))
+ vertices_per_group = unique_string(desp='nvert_grp')
+ init_pr = unique_string(desp='init')
+ random_prob = unique_string(desp='rand')
+ subq = unique_string(desp='subquery')
+ rand_damp = 1 - damping_factor
+ grouping_where_clause = ' AND '.join(
+ [distinct_grp_table + '.' + col + '=' + subq + '.' + col
+ for col in grouping_cols_list])
+ group_by_clause = ', '.join([distinct_grp_table + '.' + col
+ for col in grouping_cols_list])
+ # Find number of vertices in each group, this is the normalizing
factor
+ # for computing the random_prob
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
+ plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
+ SELECT {distinct_grp_table}.*,
+ 1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
+ {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS
{random_prob}
+ FROM {distinct_grp_table} INNER JOIN (
+ SELECT {grouping_cols}, {src} AS __vertices__
+ FROM {edge_temp_table}
+ UNION
+ SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+ ){subq}
+ ON {grouping_where_clause}
+ GROUP BY {group_by_clause}
+ """.format(**locals()))
+
+ grouping_where_clause = ' AND '.join(
+ [vertices_per_group + '.' + col + '=' + subq + '.' + col
+ for col in grouping_cols_list])
+ group_by_clause = ', '.join([vertices_per_group + '.' + col
+ for col in grouping_cols_list])
+ plpy.execute("""
+ CREATE TEMP TABLE {cur} AS
+ SELECT {group_by_clause}, {subq}.__vertices__ as
{vertex_id},
+ {init_pr} AS pagerank
+ FROM {vertices_per_group} INNER JOIN (
+ SELECT {grouping_cols}, {src} AS __vertices__
+ FROM {edge_temp_table}
+ UNION
+ SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+ ){subq}
+ ON {grouping_where_clause}
+ {cur_distribution}
+ """.format(**locals()))
+ vpg = unique_string(desp='vpg')
+ # Compute the out-degree of every node in the group-based
subgraphs.
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+ plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+ SELECT {grouping_cols_select} {src} AS {vertex_id},
+ COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_temp_table}
- INNER JOIN {cur} ON {cur_join_clause}
- INNER JOIN {out_cnts} ON {out_cnts_join_clause}
- INNER JOIN {cur} AS {v1} ON {v1_join_clause}
- {vertices_per_group_inner_join_pr}
- {ignore_group_clause}
- GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
- {cur_distribution}
- """.format(ignore_group_clause=ignore_group_clause_pr
- if iteration_num>0 else ignore_group_clause_first,
- **locals()))
- # If there are nodes that have no incoming edges, they are not
- # captured in the message table. Insert entries for such nodes,
- # with random_prob.
- plpy.execute("""
- INSERT INTO {message}
- SELECT *
- FROM {nodes_with_no_incoming_edges}
- {ignore_group_clause}
- """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
- if iteration_num>0 else ignore_group_clause_first,
- **locals()))
- # Check for convergence:
- ## Check for convergence only if threshold != 0.
- if threshold != 0:
- # message_unconv and cur_unconv will contain the unconverged groups
- # after current # and previous iterations respectively. Groups that
- # are missing in message_unconv but appear in cur_unconv are the
- # groups that have converged after this iteration's computations.
- # If no grouping columns are specified, then we check if there is
- # at least one unconverged node (limit 1 is used in the query).
+ GROUP BY {grouping_cols_select} {src}
+ {cnts_distribution}
+ """.format(grouping_cols_select=grouping_cols + ','
+ if grouping_cols else '', **locals()))
+
+ message_grp = ' AND '.join(
+ ["{cur}.{col}={message}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
+ ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND
'.join(
+ ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
+ ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ vpg_join_clause = ' AND '.join(
+ ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ vpg_t1_join_clause = ' AND '.join(
+ ["__t1__.{col}={vpg}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ # join clause specific to populating random_prob for nodes without
any
+ # incoming edges.
+ edge_grouping_cols_select = ', '.join(
+ ["{edge_temp_table}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ cur_grouping_cols_select = ', '.join(
+ ["{cur}.{col}".format(**locals()) for col in
grouping_cols_list])
+ # Create output summary table:
+ cols_names_types = get_cols_and_types(edge_table)
+ grouping_cols_clause = ', '.join([c_name + " " + c_type
+ for (c_name, c_type) in
cols_names_types
+ if c_name in grouping_cols_list])
plpy.execute("""
- CREATE TEMP TABLE {message_unconv} AS
- SELECT {grouping_cols_select_conv}
- FROM {message}
- INNER JOIN {cur}
- ON {cur}.{vertex_id}={message}.{vertex_id}
- WHERE {message_grp_clause_conv}
- ABS({cur}.pagerank-{message}.pagerank) > {threshold}
- {ignore_group_clause}
- {group_by_grouping_cols_conv}
- {limit}
- """.format(ignore_group_clause=ignore_group_clause_ins
- if iteration_num>0 else ignore_group_clause_conv,
- **locals()))
- unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
- """.format(message_unconv))[0]["cnt"]
- if iteration_num > 0 and grouping_cols:
- # Update result and summary tables for groups that have
- # converged
- # since the last iteration.
- update_result_tables(temp_summary_table, iteration_num,
- summary_table, out_table, message, grouping_cols_list,
- cur_unconv, message_unconv)
- plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
- plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
- {cur_unconv} """.format(**locals()))
+ CREATE TABLE {summary_table} (
+ {grouping_cols_clause},
+ __iterations__ INTEGER
+ )
+ """.format(**locals()))
+ # Create output table. This will be updated whenever a group
converges
+ # Note that vertex_id is assumed to be an integer (as described in
+ # documentation)
+ plpy.execute("""
+ CREATE TABLE {out_table} (
+ {grouping_cols_clause},
+ {vertex_id} INTEGER,
+ pagerank DOUBLE PRECISION
+ )
+ """.format(**locals()))
+ temp_summary_table = unique_string(desp='temp_summary')
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
+ plpy.execute("""
+ CREATE TABLE {temp_summary_table} (
+ {grouping_cols_clause}
+ )
+ """.format(**locals()))
+
######################################################################
+ # Strings required for the main PageRank computation query
+ grouping_cols_select_pr = edge_grouping_cols_select + ', '
+ random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
+ vertices_per_group_inner_join_pr = """INNER JOIN
{vertices_per_group}
+ AS {vpg} ON {vpg_join_clause}""".format(**locals())
+ ignore_group_clause_pr = ' WHERE ' +
get_ignore_groups(summary_table,
+
edge_temp_table, grouping_cols_list)
+ ignore_group_clause_ins_noincoming = ' WHERE ' + get_ignore_groups(
+ summary_table, nodes_with_no_incoming_edges,
grouping_cols_list)
+ # Strings required for updating PageRank scores of vertices that
have
+ # no incoming edges
+ grouping_cols_select_ins = cur_grouping_cols_select + ','
+ vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
+ **locals())
+ vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
+ **locals())
+ message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
+ ignore_group_clause_ins = ' AND ' +
get_ignore_groups(summary_table,
+ cur,
grouping_cols_list)
+ # Strings required for convergence test query
+ grouping_cols_select_conv = cur_grouping_cols_select
+ group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
+ cur_grouping_cols_select)
+ message_grp_clause_conv = '{0} AND '.format(message_grp)
+ ignore_group_clause_conv = ' AND ' +
get_ignore_groups(summary_table,
+ cur,
grouping_cols_list)
+ limit = ''
+
+ # Find all nodes, in each group, that have no incoming edges. The
PageRank
+ # value of these nodes are not updated using the first query in the
+ # following for loop. They must be explicitly plugged back in to
the
+ # message table, with their corresponding group's random_prob as
their
+ # PageRank values.
+ plpy.execute("""
+ CREATE TABLE {nodes_with_no_incoming_edges} AS
+ SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
+ {vpg}.{random_prob} AS pagerank
+ FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM {edge_temp_table} AS __t2__
+ WHERE __t1__.{src}=__t2__.{dest} AND
{where_group_clause}
+ ) {vpg_where_clause_ins}
+ """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
+ for col in
grouping_cols_list]),
+ where_group_clause=' AND
'.join(['__t1__.{0}=__t2__.{0}'.format(col)
+ for col in
grouping_cols_list]),
+ **locals()))
else:
- # Do not run convergence test if threshold=0, since that implies
- # the user wants to run max_iter iterations.
- unconverged = 1
- plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
- plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+ # cur and out_cnts tables can be simpler when no grouping is
involved.
+ init_value = 1.0 / n_vertices
+ plpy.execute("""
+ CREATE TEMP TABLE {cur} AS
+ SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS
pagerank
+ FROM {vertex_table}
+ {cur_distribution}
""".format(**locals()))
- if unconverged == 0:
- break
- # If there still are some unconverged groups/(entire table),
- # update results.
- if grouping_cols:
- if unconverged > 0:
+ # Compute the out-degree of every node in the graph.
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+ plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+ SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
+ FROM {edge_temp_table}
+ GROUP BY {src}
+ {cnts_distribution}
+ """.format(**locals()))
+ # The summary table when there is no grouping will contain only
+ # the iteration column. We don't need to create the out_table
+ # when no grouping is used since the 'cur' table will be renamed
+ # to out_table after pagerank computation is completed.
+ plpy.execute("""
+ CREATE TABLE {summary_table} (
+ __iterations__ INTEGER
+ )
+ """.format(**locals()))
+ # Find all nodes in the graph that don't have any incoming edges
and
+ # assign random_prob as their pagerank values.
+ plpy.execute("""
+ CREATE TABLE {nodes_with_no_incoming_edges} AS
+ SELECT DISTINCT({src}), {random_probability} AS pagerank
+ FROM {edge_temp_table}
+ EXCEPT
+ (SELECT DISTINCT({dest}), {random_probability} AS
pagerank
+ FROM {edge_temp_table})
+ """.format(**locals()))
+ unconverged = 0
+ iteration_num = 0
+ for iteration_num in range(max_iter):
+
#####################################################################
+ # PageRank for node 'A' at any given iteration 'i' is given by:
+ # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
+ # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
+ # where 'N' is the number of vertices in the graph,
+ # B, C are nodes that have edges to node A, and
+ # degree(node) represents the number of outgoing edges from 'node'
+
#####################################################################
+ # Essentially, the pagerank for a node is based on an aggregate of
a
+ # fraction of the pagerank values of all the nodes that have
incoming
+ # edges to it, along with a small random probability.
+ # More information can be found at:
+ # https://en.wikipedia.org/wiki/PageRank#Damping_factor
+
+ # The query below computes the PageRank of each node using the
above
+ # formula. A small explanatory note on ignore_group_clause:
+ # This is used only when grouping is set. This essentially will
have
+ # the condition that will help skip the PageRank computation on
groups
+ # that have converged.
+ plpy.execute("""
+ CREATE TABLE {message} AS
+ SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest}
AS {vertex_id},
+
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
AS pagerank
+ FROM {edge_temp_table}
+ INNER JOIN {cur} ON {cur_join_clause}
+ INNER JOIN {out_cnts} ON {out_cnts_join_clause}
+ INNER JOIN {cur} AS {v1} ON {v1_join_clause}
+ {vertices_per_group_inner_join_pr}
+ {ignore_group_clause}
+ GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
+ {cur_distribution}
+ """.format(ignore_group_clause=ignore_group_clause_pr
+ if iteration_num > 0 else ignore_group_clause_first,
+ **locals()))
+ # If there are nodes that have no incoming edges, they are not
+ # captured in the message table. Insert entries for such nodes,
+ # with random_prob.
+ plpy.execute("""
+ INSERT INTO {message}
+ SELECT *
+ FROM {nodes_with_no_incoming_edges}
+ {ignore_group_clause}
+
""".format(ignore_group_clause=ignore_group_clause_ins_noincoming
+ if iteration_num > 0 else ignore_group_clause_first,
+ **locals()))
+ # Check for convergence:
+ # Check for convergence only if threshold != 0.
if threshold != 0:
- # We completed max_iters, but there are still some unconverged
- # groups # Update the result and summary tables for unconverged
- # groups.
- update_result_tables(temp_summary_table, iteration_num,
- summary_table, out_table, cur, grouping_cols_list,
- cur_unconv)
+ # message_unconv and cur_unconv will contain the unconverged
groups
+ # after current # and previous iterations respectively. Groups
that
+ # are missing in message_unconv but appear in cur_unconv are
the
+ # groups that have converged after this iteration's
computations.
+ # If no grouping columns are specified, then we check if there
is
+ # at least one unconverged node (limit 1 is used in the query).
+ plpy.execute("""
+ CREATE TEMP TABLE {message_unconv} AS
+ SELECT {grouping_cols_select_conv}
+ FROM {message}
+ INNER JOIN {cur}
+ ON {cur}.{vertex_id}={message}.{vertex_id}
+ WHERE {message_grp_clause_conv}
+ ABS({cur}.pagerank-{message}.pagerank) >
{threshold}
+ {ignore_group_clause}
+ {group_by_grouping_cols_conv}
+ {limit}
+ """.format(ignore_group_clause=ignore_group_clause_ins
+ if iteration_num > 0 else
ignore_group_clause_conv,
+ **locals()))
+ unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
+ """.format(message_unconv))[0]["cnt"]
+ if iteration_num > 0 and grouping_cols:
+ # Update result and summary tables for groups that have
+ # converged
+ # since the last iteration.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, message,
grouping_cols_list,
+ cur_unconv, message_unconv)
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
+ plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
+ {cur_unconv} """.format(**locals()))
else:
- # No group has converged. List of all group values are in
- # distinct_grp_table.
- update_result_tables(temp_summary_table, iteration_num,
- summary_table, out_table, cur, grouping_cols_list,
- distinct_grp_table)
- else:
- plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
- """.format(table_name=cur, **locals()))
- plpy.execute("""
+ # Do not run convergence test if threshold=0, since that
implies
+ # the user wants to run max_iter iterations.
+ unconverged = 1
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
+ plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+ """.format(**locals()))
+ if unconverged == 0:
+ break
+
+ # If there still are some unconverged groups/(entire table),
+ # update results.
+ if grouping_cols:
+ if unconverged > 0:
+ if threshold != 0:
+ # We completed max_iters, but there are still some
unconverged
+ # groups # Update the result and summary tables for
unconverged
+ # groups.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, cur,
grouping_cols_list,
+ cur_unconv)
+ else:
+ # No group has converged. List of all group values are in
+ # distinct_grp_table.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, cur,
grouping_cols_list,
+ distinct_grp_table)
+ else:
+ plpy.execute("""
+ ALTER TABLE {table_name}
+ RENAME TO {out_table}
+ """.format(table_name=cur, **locals()))
+ plpy.execute("""
INSERT INTO {summary_table} VALUES
- ({iteration_num}+1);
- """.format(**locals()))
+ ({iteration_num}+1)
+ """.format(**locals()))
+
+ # Step 4: Cleanup
+ plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}
+ """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
+ message_unconv, nodes_with_no_incoming_edges))
+ if grouping_cols:
+ plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}
+ """.format(vertices_per_group, temp_summary_table,
+ distinct_grp_table))
- ## Step 4: Cleanup
- plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6};
- """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
- message_unconv, nodes_with_no_incoming_edges))
- if grouping_cols:
- plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
- """.format(vertices_per_group, temp_summary_table,
- distinct_grp_table))
- plpy.execute("SET client_min_messages TO %s" % old_msg_level)
def update_result_tables(temp_summary_table, i, summary_table, out_table,
- res_table, grouping_cols_list, cur_unconv, message_unconv=None):
+ res_table, grouping_cols_list, cur_unconv,
+ message_unconv=None):
"""
This function updates the summary and output tables only for those
groups that have converged. This is found out by looking at groups
@@ -556,7 +551,7 @@ def update_result_tables(temp_summary_table, i,
summary_table, out_table,
FROM {cur_unconv}
WHERE {join_condition}
""".format(join_condition=get_ignore_groups(
- message_unconv, cur_unconv, grouping_cols_list), **locals()))
+ message_unconv, cur_unconv, grouping_cols_list), **locals()))
plpy.execute("""
INSERT INTO {summary_table}
SELECT *, {i}+1 AS __iteration__
@@ -569,21 +564,24 @@ def update_result_tables(temp_summary_table, i,
summary_table, out_table,
INNER JOIN {temp_summary_table}
ON {join_condition}
""".format(join_condition=' AND '.join(
- ["{res_table}.{col}={temp_summary_table}.{col}".format(
- **locals())
- for col in grouping_cols_list]), **locals()))
+ ["{res_table}.{col}={temp_summary_table}.{col}".format(
+ **locals())
+ for col in grouping_cols_list]), **locals()))
+
def get_ignore_groups(first_table, second_table, grouping_cols_list):
"""
This function generates the necessary clause to only select the
groups that appear in second_table and not in first_table.
"""
- return """({second_table_cols}) NOT IN (SELECT {grouping_cols} FROM
- {first_table}) """.format(second_table_cols=', '.join(
- ["{second_table}.{col}".format(**locals())
- for col in grouping_cols_list]),
- grouping_cols=', '.join([col for col in grouping_cols_list]),
- **locals())
+ second_table_cols = ', '.join(["{0}.{1}".format(second_table, col)
+ for col in grouping_cols_list])
+ grouping_cols = ', '.join([col for col in grouping_cols_list])
+ return """({second_table_cols}) NOT IN
+ (SELECT {grouping_cols}
+ FROM {first_table})
+ """.format(**locals())
+
def pagerank_help(schema_madlib, message, **kwargs):
"""
@@ -601,7 +599,7 @@ def pagerank_help(schema_madlib, message, **kwargs):
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'PageRank',
- """out_table TEXT, -- Name of the output table for PageRank
+ """out_table TEXT, -- Name of the
output table for PageRank
damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
-- (DEFAULT = 0.85)
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)