Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/178#discussion_r136631234
  
    --- Diff: src/ports/postgres/modules/graph/hits.py_in ---
    @@ -0,0 +1,439 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +# HITS
    +
    +# Please refer to the hits.sql_in file for the documentation
    +
    +"""
    +@file hits.py_in
    +
    +@namespace graph
    +"""
    +
    +import math
    +import plpy
    +from utilities.control import MinWarning
    +from utilities.utilities import _assert
    +from utilities.utilities import add_postfix
    +from utilities.utilities import extract_keyvalue_params
    +from utilities.utilities import unique_string
    +from utilities.utilities import is_platform_pg
    +
    +from graph_utils import *
    +
    +
    +def validate_hits_args(schema_madlib, vertex_table, vertex_id, edge_table,
    +                        edge_params, out_table, max_iter, threshold):
    +    """
    +    Function to validate input parameters for HITS
    +    """
    +    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
    +                          out_table, 'HITS')
    +    _assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
    +            "HITS: Invalid threshold value ({0}), must be between 0 and 
1.".
    +            format(threshold))
    +    _assert(max_iter > 0,
    +            """HITS: Invalid max_iter value ({0}), must be a positive 
integer.""".
    +            format(max_iter))
    +
    +def hits(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
    +             out_table, max_iter, threshold, **kwargs):
    +    """
    +    Function that computes the HITS scores
    +
    +    Args:
    +        @param vertex_table
    +        @param vertex_id
    +        @param edge_table
    +        @param source_vertex
    +        @param dest_vertex
    +        @param out_table
    +        @param max_iter
    +        @param threshold
    +    """
    +    with MinWarning('warning'):
    +        params_types = {'src': str, 'dest': str}
    +        default_args = {'src': 'src', 'dest': 'dest'}
    +        edge_params = extract_keyvalue_params(edge_args, params_types, 
default_args)
    +
    +        # populate default values for optional params if null
    +        if max_iter is None:
    +            max_iter = 100
    +        if not vertex_id:
    +            vertex_id = "id"
    +
    +        validate_hits_args(schema_madlib, vertex_table, vertex_id, 
edge_table,
    +                               edge_params, out_table, max_iter, threshold)
    +        summary_table = add_postfix(out_table, "_summary")
    +        _assert(not table_exists(summary_table),
    +                "Graph HITS: Output summary table ({summary_table}) 
already exists."
    +                .format(**locals()))
    +
    +        src = edge_params["src"]
    +        dest = edge_params["dest"]
    +        n_vertices = plpy.execute("""
    +                    SELECT COUNT({0}) AS cnt
    +                    FROM {1}
    +                """.format(vertex_id, vertex_table))[0]["cnt"]
    +
    +        # Assign default threshold value based on number of nodes in the 
graph.
    +        if threshold is None:
    +            threshold = 1.0 / (n_vertices * 1000)
    +
    +        edge_temp_table = unique_string(desp='temp_edge')
    +        distribution = ('' if is_platform_pg() else
    +                        "DISTRIBUTED BY ({0})".format(dest))
    +        plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
    +        plpy.execute("""
    +            CREATE TEMP TABLE {edge_temp_table} AS
    +                SELECT * FROM {edge_table}
    +                {distribution}
    +            """.format(**locals()))
    +
    +        # GPDB and HAWQ have distributed by clauses to help them with 
indexing.
    +        # For Postgres we add the index explicitly.
    +        if is_platform_pg():
    +            plpy.execute("CREATE INDEX ON 
{0}({1})".format(edge_temp_table, dest))
    +
    +        # Intermediate tables required.
    +        cur = unique_string(desp='cur')
    +        message = unique_string(desp='message')
    +        v1 = unique_string(desp='v1')
    +        message_unconv_authority = 
unique_string(desp='message_unconv_authority')
    +        message_unconv_hub = unique_string(desp="message_unconv_hub")
    +        tmp = unique_string(desp='tmp')
    +        tmp2 = unique_string(desp='tmp2')
    +        tmp3 = unique_string(desp='tmp3')
    +        v2 = unique_string(desp='v2')
    +
    +        nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
    +
    +        if is_platform_pg():
    +            cur_distribution = cnts_distribution = ''
    +        else:
    +            cur_distribution = cnts_distribution = \
    +                "DISTRIBUTED BY ({0})".format(vertex_id)
    +        cur_join_clause = "{edge_temp_table}.{dest} = 
{cur}.{vertex_id}".format(**locals())
    +        v1_join_clause = "{v1}.{vertex_id} = 
{edge_temp_table}.{src}".format(**locals())
    +
    +        authority_init_value = 1.0
    +        hub_init_value = 1.0
    +        plpy.execute("""
    +                CREATE TEMP TABLE {cur} AS
    +                SELECT {vertex_id}, {authority_init_value}::DOUBLE 
PRECISION AS authority,
    +                {hub_init_value}::DOUBLE PRECISION AS hub
    +                FROM {vertex_table}
    +                {cur_distribution}
    +            """.format(**locals()))
    +
    +        # The summary table contains the total number of iterations.
    +        plpy.execute("""
    +                CREATE TABLE {summary_table} (
    +                    __iterations__ INTEGER
    +                )
    +            """.format(**locals()))
    +
    +        # Find all nodes in the graph that don't have any incoming edges 
and
    +        # assign 0 as their authority and hub values.
    +        no_incoming_node_authority_value = 0.0
    +        no_incoming_node_hub_value = 0.0
    +        plpy.execute("""
    +                CREATE TABLE {nodes_with_no_incoming_edges} AS
    +                SELECT DISTINCT({src}), 
{no_incoming_node_authority_value}::DOUBLE PRECISION AS authority,
    +                {no_incoming_node_hub_value}::DOUBLE PRECISION AS hub
    +                FROM {edge_temp_table}
    +                EXCEPT
    +                    (SELECT DISTINCT({dest}), 
{no_incoming_node_authority_value}::DOUBLE PRECISION AS authority,
    +                    {no_incoming_node_hub_value}::DOUBLE PRECISION AS hub
    +                    FROM {edge_temp_table})
    +            """.format(**locals()))
    +
    +        unconverged_authority_num = 0
    +        unconverged_hub_num = 0
    +        iteration_num = 0
    +        authority_norm = 0
    +        hub_norm = 0
    +
    +        plpy.execute("""
    +                    CREATE TABLE {message_unconv_authority} AS
    +                    SELECT {cur}.{vertex_id}
    +                    FROM {cur}
    +                    LIMIT 0
    +                    """.format(**locals()))
    +        plpy.execute("""
    +                    CREATE TABLE {message_unconv_hub} AS
    +                    SELECT {cur}.{vertex_id}
    +                    FROM {cur}
    +                    LIMIT 0
    +                    """.format(**locals()))
    +
    +        for iteration_num in range(max_iter):
    +        
##########################################################################
    +        # HITS scores for node 'A' at any given iteration 'i' is 
calculated as following:
    +        # authority_i(A) = hub_i(B) + hub_i(C) + ...
    +        # After calculating authority score for all nodes, a normalization 
will 
    +        # be done for authority_i(A), authority_i(B), authority_i(C), ... 
    +        # Then
    +        # hub_i(A) = authority_i(B) + authority_i(C) + ... 
    +        # After calculating hub score for all nodes, a normalization will 
    +        # be done for hub_i(A), hub_i(B), hub_i(C), ...
    +        # B, C are nodes that have edges that point to node A
    +        
#########################################################################
    +
    +            
#####################################################################
    +            # calculate authority 
    +            
#####################################################################
    +            plpy.execute("""
    +                    CREATE TABLE {message} AS
    +                    SELECT {edge_temp_table}.{dest} AS {vertex_id},
    +                            SUM({v1}.hub) AS authority,
    +                            {cur}.hub AS hub
    +                    FROM {edge_temp_table}
    +                        INNER JOIN {cur} ON {cur_join_clause}
    +                        INNER JOIN {cur} AS {v1} ON {v1_join_clause}
    +                    GROUP BY {edge_temp_table}.{dest}, {cur}.hub
    +                    {cur_distribution}
    +                """.format(**locals()))
    +
    +            #normalize authority value, since points with 0 authority 
score are not included
    +            # in the message table, norm value will always be positive 
value
    +            authority_norm = math.sqrt(plpy.execute("""
    +                SELECT SUM({tmp}.authority_square) AS sum_norm_square
    +                FROM
    +                (SELECT POWER(authority, 2) AS authority_square
    +                FROM {message}) AS {tmp}
    +                """.format(**locals()))[0]["sum_norm_square"])
    +            plpy.execute("""
    +                UPDATE {message}
    +                SET authority = authority/{authority_norm}
    +                """.format(**locals()))
    +
    +            #####################################################
    +            #calculate hub
    +            #####################################################
    +            message_join_clause = "{edge_temp_table}.{dest} = 
{message}.{vertex_id}".format(**locals())
    +            v2_join_clause = "{v2}.{vertex_id} = 
{edge_temp_table}.{src}".format(**locals())
    +            plpy.execute("""
    +                    UPDATE {message}
    +                    SET hub = {tmp2}.hub
    +                    FROM
    +                    (SELECT {edge_temp_table}.{dest} AS {vertex_id},
    +                            SUM({v2}.authority) AS hub, 
{message}.authority AS authority
    +                    FROM {edge_temp_table}
    +                        INNER JOIN {message} ON {message_join_clause}
    +                        INNER JOIN {message} AS {v2} ON {v2_join_clause}
    +                    GROUP BY {edge_temp_table}.{dest},{message}.authority) 
AS {tmp2}
    +                    WHERE {tmp2}.{vertex_id} = {message}.{vertex_id}
    +                """.format(**locals()))
    +
    +            #normalize hub value, since points with 0 hub score are not 
included
    +            # in the message table, norm value will always be positive 
value
    +            hub_norm = math.sqrt(plpy.execute("""
    +            SELECT SUM({tmp3}.hub_square) AS sum_hub_square
    +            FROM
    +            (SELECT POWER(hub, 2) AS hub_square
    +            FROM {message}) AS {tmp3}
    +            """.format(**locals()))[0]["sum_hub_square"])
    +
    +            plpy.execute("""
    +            UPDATE {message}
    +            SET hub = hub/{hub_norm}
    +            """.format(**locals()))
    +
    +            # Check for convergence:
    +            # Check for convergence only if threshold != 0.
    +            if threshold != 0:
    +                # message_unconv and cur_unconv will contain the 
unconverged groups
    +                # after current # and previous iterations respectively. we 
check if there is
    +                # at least one unconverged node (limit 1 is used in the 
query).
    +                limit = ' LIMIT 1 '
    +                plpy.execute("""
    +                        TRUNCATE TABLE {message_unconv_authority};
    +                        INSERT INTO {message_unconv_authority}
    +                        SELECT {cur}.{vertex_id}
    +                        FROM {message}
    +                        INNER JOIN {cur}
    +                        ON {cur}.{vertex_id}={message}.{vertex_id}
    +                        WHERE ABS({cur}.authority-{message}.authority) > 
{threshold}
    +                        {limit}
    +                    """.format(**locals()))
    +                unconverged_authority_num = plpy.execute("""SELECT 
COUNT(*) AS cnt FROM {0}
    +                    """.format(message_unconv_authority))[0]["cnt"]
    +
    +                plpy.execute("""
    +                        TRUNCATE TABLE {message_unconv_hub};
    +                        INSERT INTO {message_unconv_hub}
    +                        SELECT {cur}.{vertex_id}
    +                        FROM {message}
    +                        INNER JOIN {cur}
    +                        ON {cur}.{vertex_id}={message}.{vertex_id}
    +                        WHERE ABS({cur}.hub-{message}.hub) > {threshold}
    +                        {limit}
    +                    """.format(**locals()))
    +                unconverged_hub_num = plpy.execute("""SELECT COUNT(*) AS 
cnt FROM {0}
    +                    """.format(message_unconv_hub))[0]["cnt"]
    +            else:
    +                # Do not run convergence test if threshold=0, since that 
implies
    +                # the user wants to run max_iter iterations.
    +                unconverged_authority_num = 1
    +                unconverged_hub_num = 1
    +
    +            plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
    +            plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
    +                    """.format(**locals()))
    +
    +            if unconverged_authority_num == 0 and unconverged_hub_num == 0:
    +                break
    +
    +        plpy.execute("""
    +            ALTER TABLE {table_name}
    +            RENAME TO {out_table}
    +            """.format(table_name=cur, **locals()))
    +        # If there are nodes that have no incoming edges, they are not
    +        # captured in the previous calculation. The authority and hub 
value should
    +        # both be 0 for those nodes. Insert entries for such nodes with 
authority=0 
    +        # and hub=0 into out_table.
    +        plpy.execute("""
    +                INSERT INTO {out_table}
    +                SELECT *
    +                FROM {nodes_with_no_incoming_edges}
    +            """.format(**locals()))
    +        #update summary_table
    +        plpy.execute("""
    +            INSERT INTO {summary_table} 
    +            VALUES({iteration_num}+1)
    +            """.format(**locals()))
    +
    +        # Cleanup All the intermediate tables
    +        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5}
    +            """.format(edge_temp_table, cur, message,
    +                       message_unconv_authority, message_unconv_hub, 
nodes_with_no_incoming_edges))
    +
    +def hits_help(schema_madlib, message, **kwargs):
    +    """
    +    Help function for hits
    +
    +    Args:
    +        @param schema_madlib
    +        @param message: string, Help message string
    +        @param kwargs
    +
    +    Returns:
    +        String. Help/usage information
    +    """
    +    if message is not None and \
    +            message.lower() in ("usage", "help", "?"):
    +        help_string = "Get from method below"
    +        help_string = get_graph_usage(schema_madlib, 'HITS',
    +                                      """out_table     TEXT, -- Name of 
the output table for HITS
    +    max_iter      INTEGER, -- Maximum iteration number (DEFAULT = 100)
    +    threshold     DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 
1/(N*1000),
    +                                    -- N is number of vertices in the 
graph)
    +""") + """
    +
    +A summary table is also created that contains information regarding the
    +number of iterations required for convergence. It is named by adding the
    +suffix '_summary' to the 'out_table' parameter.
    +"""
    +    else:
    +        if message is not None and \
    +                message.lower() in ("example", "examples"):
    +            help_string = """
    
+----------------------------------------------------------------------------
    +                                EXAMPLES
    
+----------------------------------------------------------------------------
    +-- Create a graph, represented as vertex and edge tables.
    +DROP TABLE IF EXISTS vertex, edge;
    +CREATE TABLE vertex(
    +        id INTEGER
    +        );
    +CREATE TABLE edge(
    +        src INTEGER,
    +        dest INTEGER,
    +        user_id INTEGER
    +        );
    +INSERT INTO vertex VALUES
    +(0),
    +(1),
    +(2),
    +(3),
    +(4),
    +(5),
    +(6);
    +INSERT INTO edge VALUES
    +(0, 1, 1),
    +(0, 2, 1),
    +(0, 4, 1),
    +(1, 2, 1),
    +(1, 3, 1),
    +(2, 3, 1),
    +(2, 5, 1),
    +(2, 6, 1),
    +(3, 0, 1),
    +(4, 0, 1),
    +(5, 6, 1),
    +(6, 3, 1),
    +(0, 1, 2),
    +(0, 2, 2),
    +(0, 4, 2),
    +(1, 2, 2),
    +(1, 3, 2),
    +(2, 3, 2),
    +(3, 0, 2),
    +(4, 0, 2),
    +(5, 6, 2),
    +(6, 3, 2);
    +
    +-- Compute the HITS score:
    +DROP TABLE IF EXISTS hits_out, hits_out_summary;
    +SELECT madlib.hits(
    --- End diff --
    
    `madlib`->`{schema_madlib}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to