[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15267901#comment-15267901
 ] 

ASF GitHub Bot commented on FLINK-2044:
---------------------------------------

Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61831147
  
    --- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to 
many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The 
implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be 
provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a 
job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, 
DataSet<Vertex<K, Double>>> {
    +
    +   public static enum HITSParameter {
    +           HUB,
    +           AUTHORITY
    +   }
    +
    +   private int maxIterations;
    +   private long numberOfVertices;
    +
    +   /**
    +    * Creates an instance of HITS algorithm.
    +    * If the number of vertices of the input graph is known,
    +    * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, 
HITSParameter)} constructor instead.
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @param hitsParameter the type of final web pages users want to get 
by this algorithm
    +    */
    +   public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +           if (hitsParameter == HITSParameter.AUTHORITY) {
    +                   this.maxIterations = maxIterations * 2;
    +           } else {
    +                   this.maxIterations = maxIterations * 2 + 1;
    +           }
    +   }
    +
    +   /**
    +    * Creates an instance of HITS algorithm.
    +    * If the number of vertices of the input graph is unknown,
    +    * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} 
constructor instead.
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @param hitsParameter the type of final web pages users want to get 
by this algorithm
    +    */
    +   public HITSAlgorithm(int maxIterations, long numberOfVertices, 
HITSParameter hitsParameter) {
    +           if (hitsParameter == HITSParameter.AUTHORITY) {
    +                   this.maxIterations = maxIterations * 2;
    +           } else {
    +                   this.maxIterations = maxIterations * 2 + 1;
    +           }
    +           this.numberOfVertices = numberOfVertices;
    +   }
    +
    +   @Override
    +   public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> 
netGraph) throws Exception {
    +           if (this.numberOfVertices == 0) {
    +                   this.numberOfVertices = netGraph.numberOfVertices();
    +           }
    +
    +           ScatterGatherConfiguration parameter = new 
ScatterGatherConfiguration();
    +           parameter.setDirection(EdgeDirection.ALL);
    +           parameter.registerAggregator("sumAllValue", new 
DoubleSumAggregator());
    +
    +           return netGraph.runScatterGatherIteration(new 
VertexUpdate<K>(maxIterations),
    +                           new MessageUpdate<K>(maxIterations), 
maxIterations, parameter).getVertices();
    +   }
    +
    +   /**
    +    * Function that updates the value of a vertex by summing up the partial
    +    * values from all messages and normalize the value.
    +    */
    +   @SuppressWarnings("serial")
    +   public static final class VertexUpdate<K> extends 
VertexUpdateFunction<K, Double, Double> {
    +           private int maxIteration;
    +           private DoubleSumAggregator doubleSumAggregator;
    +
    +           public VertexUpdate(int maxIteration) {
    +                   this.maxIteration = maxIteration;
    +           }
    +
    +           @Override
    +           public void preSuperstep() {
    +                   doubleSumAggregator = 
getIterationAggregator("sumAllValue");
    +           }
    +
    +           @Override
    +           public void updateVertex(Vertex<K, Double> vertex, 
MessageIterator<Double> inMessages) {
    +                   double updateValue = 0;
    +
    +                   for (double element : inMessages) {
    +                           if (getSuperstepNumber() == maxIteration) {
    +                                   updateValue = element;
    +                                   break;
    +                           }
    +                           updateValue += element;
    +                   }
    +
    +                   if (getSuperstepNumber() != maxIteration) {
    +                           setNewVertexValue(updateValue);
    +                           doubleSumAggregator.aggregate(updateValue);
    +                   } else {
    +                           setNewVertexValue(vertex.getValue() / 
updateValue);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Distributes the value of a vertex among all neighbor vertices and 
sum all the
    +    * value in every superstep.
    +    */
    +   @SuppressWarnings("serial")
    +   public static final class MessageUpdate<K> extends MessagingFunction<K, 
Double, Double, Double> {
    +           private int maxIteration;
    +
    +           public MessageUpdate(int maxIteration) {
    +                   this.maxIteration = maxIteration;
    +           }
    +
    +           @Override
    +           public void sendMessages(Vertex<K, Double> vertex) {
    +                   for (Edge<K, Double> edge : getEdges()) {
    +                           if (getSuperstepNumber() % 2 == 1) {
    --- End diff --
    
    yes, the logic here is a little mixed. I will refactor it.


> Implementation of Gelly HITS Algorithm
> --------------------------------------
>
>                 Key: FLINK-2044
>                 URL: https://issues.apache.org/jira/browse/FLINK-2044
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>            Reporter: Ahamd Javid
>            Assignee: GaoLun
>            Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to