[ 
https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501354#comment-14501354
 ] 

ASF GitHub Bot commented on FLINK-1523:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/537#discussion_r28643926
  
    --- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 ---
    @@ -138,69 +146,46 @@ public void setInput(DataSet<Vertex<VertexKey, 
VertexValue>> inputData) {
                if (this.initialVertices == null) {
                        throw new IllegalStateException("The input data set has 
not been set.");
                }
    -           
    +
                // prepare some type information
    -           TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = 
initialVertices.getType();
                TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) 
initialVertices.getType()).getTypeAt(0);
                TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = 
new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
     
    -           final int[] zeroKeyPos = new int[] {0};
    -   
    -           final DeltaIteration<Vertex<VertexKey, VertexValue>, 
Vertex<VertexKey, VertexValue>> iteration =
    -                   this.initialVertices.iterateDelta(this.initialVertices, 
this.maximumNumberOfIterations, zeroKeyPos);
    +           // create a graph
    +           Graph<VertexKey, VertexValue, EdgeValue> graph =
    +                           Graph.fromDataSet(initialVertices, 
edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
     
    -           // set up the iteration operator
    -           if (this.configuration != null) {
    +           // check whether the numVertices option is set and, if so, 
compute the total number of vertices
    +           // and set it within the messaging and update functions
     
    -                   iteration.name(this.configuration.getName(
    -                                   "Vertex-centric iteration (" + 
updateFunction + " | " + messagingFunction + ")"));
    -                   
iteration.parallelism(this.configuration.getParallelism());
    -                   
iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
    -
    -                   // register all aggregators
    -                   for (Map.Entry<String, Aggregator<?>> entry : 
this.configuration.getAggregators().entrySet()) {
    -                           iteration.registerAggregator(entry.getKey(), 
entry.getValue());
    +           if (this.configuration != null && 
this.configuration.isOptNumVertices()) {
    +                   try {
    +                           long numberOfVertices = 
graph.numberOfVertices();
    +                           
messagingFunction.setNumberOfVertices(numberOfVertices);
    +                           
updateFunction.setNumberOfVertices(numberOfVertices);
    +                   } catch (Exception e) {
    +                           e.printStackTrace();
                        }
                }
    -           else {
    -                   // no configuration provided; set default name
    -                   iteration.name("Vertex-centric iteration (" + 
updateFunction + " | " + messagingFunction + ")");
    -           }
    -           
    -           // build the messaging function (co group)
    -           CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
    -           MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, 
EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, 
Message, EdgeValue>(messagingFunction, messageTypeInfo);
    -           messages = 
this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
    -           
    -           // configure coGroup message function with name and broadcast 
variables
    -           messages = messages.name("Messaging");
     
    -           if (this.configuration != null) {
    -                   for (Tuple2<String, DataSet<?>> e : 
this.configuration.getMessagingBcastVars()) {
    -                           messages = messages.withBroadcastSet(e.f1, 
e.f0);
    -                   }                       
    +           if(this.configuration != null) {
    +                   
messagingFunction.setDirection(this.configuration.getDirection());
    +           } else {
    +                   messagingFunction.setDirection(EdgeDirection.OUT);
                }
    -           
    -           VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = 
new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, 
vertexTypes);
    -           
    -           // build the update function (co group)
    -           CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
    -                           
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
    -           
    -           // configure coGroup update function with name and broadcast 
variables
    -           updates = updates.name("Vertex State Updates");
     
    -           if (this.configuration != null) {
    -                   for (Tuple2<String, DataSet<?>> e : 
this.configuration.getUpdateBcastVars()) {
    -                           updates = updates.withBroadcastSet(e.f1, e.f0);
    -                   }                       
    -           }
    +           // retrieve the direction in which the updates are made and in 
which the messages are sent
    +           EdgeDirection messagingDirection = 
messagingFunction.getDirection();
     
    -           // let the operator know that we preserve the key field
    -           
updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
    -           
    -           return iteration.closeWith(updates, updates);
    -           
    +           DataSet<Tuple2<VertexKey, Message>> messages = null;
    --- End diff --
    
    why create a null dataset and pass it as a parameter to the methods below?


> Vertex-centric iteration extensions
> -----------------------------------
>
>                 Key: FLINK-1523
>                 URL: https://issues.apache.org/jira/browse/FLINK-1523
>             Project: Flink
>          Issue Type: Improvement
>          Components: Gelly
>            Reporter: Vasia Kalavri
>            Assignee: Andra Lungu
>
> We would like to make the following extensions to the vertex-centric 
> iterations of Gelly:
> - allow vertices to access their in/out degrees and the total number of 
> vertices of the graph, inside the iteration.
> - allow choosing the neighborhood type (in/out/all) over which to run the 
> vertex-centric iteration. Now, the model uses the updates of the in-neighbors 
> to calculate state and send messages to out-neighbors. We could add a 
> parameter with value "in/out/all" to the {{VertexUpdateFunction}} and 
> {{MessagingFunction}}, that would indicate the type of neighborhood.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to