[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501354#comment-14501354 ]
ASF GitHub Bot commented on FLINK-1523: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643926 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -138,69 +146,46 @@ public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) { if (this.initialVertices == null) { throw new IllegalStateException("The input data set has not been set."); } - + // prepare some type information - TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType(); TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0); TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType); - final int[] zeroKeyPos = new int[] {0}; - - final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration = - this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos); + // create a graph + Graph<VertexKey, VertexValue, EdgeValue> graph = + Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment()); - // set up the iteration operator - if (this.configuration != null) { + // check whether the numVertices option is set and, if so, compute the total number of vertices + // and set it within the messaging and update functions - iteration.name(this.configuration.getName( - "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")")); - iteration.parallelism(this.configuration.getParallelism()); - iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory()); - - // register all aggregators - for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) { - iteration.registerAggregator(entry.getKey(), entry.getValue()); + if (this.configuration != null && this.configuration.isOptNumVertices()) { + try { + long numberOfVertices = graph.numberOfVertices(); + messagingFunction.setNumberOfVertices(numberOfVertices); + updateFunction.setNumberOfVertices(numberOfVertices); + } catch (Exception e) { + e.printStackTrace(); } } - else { - // no configuration provided; set default name - iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"); - } - - // build the messaging function (co group) - CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages; - MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo); - messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger); - - // configure coGroup message function with name and broadcast variables - messages = messages.name("Messaging"); - if (this.configuration != null) { - for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) { - messages = messages.withBroadcastSet(e.f1, e.f0); - } + if(this.configuration != null) { + messagingFunction.setDirection(this.configuration.getDirection()); + } else { + messagingFunction.setDirection(EdgeDirection.OUT); } - - VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes); - - // build the update function (co group) - CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates = - messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf); - - // configure coGroup update function with name and broadcast variables - updates = updates.name("Vertex State Updates"); - if (this.configuration != null) { - for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) { - updates = updates.withBroadcastSet(e.f1, e.f0); - } - } + // retrieve the direction in which the updates are made and in which the messages are sent + EdgeDirection messagingDirection = messagingFunction.getDirection(); - // let the operator know that we preserve the key field - updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); - - return iteration.closeWith(updates, updates); - + DataSet<Tuple2<VertexKey, Message>> messages = null; --- End diff -- why create a null dataset and pass it as a parameter to the methods below? > Vertex-centric iteration extensions > ----------------------------------- > > Key: FLINK-1523 > URL: https://issues.apache.org/jira/browse/FLINK-1523 > Project: Flink > Issue Type: Improvement > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Andra Lungu > > We would like to make the following extensions to the vertex-centric > iterations of Gelly: > - allow vertices to access their in/out degrees and the total number of > vertices of the graph, inside the iteration. > - allow choosing the neighborhood type (in/out/all) over which to run the > vertex-centric iteration. Now, the model uses the updates of the in-neighbors > to calculate state and send messages to out-neighbors. We could add a > parameter with value "in/out/all" to the {{VertexUpdateFunction}} and > {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)