Re: Flink cluster dev environment in Docker
+1 for the idea. I cross post this to dev@ list for FYI - Henry On Tue, Mar 17, 2015 at 2:54 AM, Robert Metzger wrote: > Hey Emmanuel, > > thank you for this great contribution. I'm going to test the docker > deployment soon. > > I would actually like to include the files into the Flink source repository. > Either into the "flink-contrib" module, or into the "tools" directory. > Whats the take of the other committers on this? > > On Tue, Mar 17, 2015 at 10:01 AM, Flavio Pompermaier > wrote: >> >> Great addition! >> >> On Tue, Mar 17, 2015 at 5:11 AM, Emmanuel wrote: >>> >>> FYI >>> >>> Posted my dev cluster deployment in Docker here: >>> >>> https://github.com/streamnsight/docker-flink >>> >>> Still need to work on aggregating the logs but I hope it can get people >>> started easy. >>> >>> Cheers > >
[jira] [Created] (FLINK-1755) NullPointerException in LocalInputChannel.getNextLookAhead()
Robert Metzger created FLINK-1755: - Summary: NullPointerException in LocalInputChannel.getNextLookAhead() Key: FLINK-1755 URL: https://issues.apache.org/jira/browse/FLINK-1755 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.9 Reporter: Robert Metzger Failed on travis, on this commit: https://github.com/rmetzger/flink/commit/6f78a6eaa3e783511ebe706ce508f54b91ceafb0 and this profile {{-P!include-yarn -Dhadoop.version=2.0.0-alpha}} the commit ({code}[ml] [tests] Force pipelined execution of ALSITCase{code}) is already contained. {code} --- T E S T S --- Running org.apache.flink.ml.feature.PolynomialBaseITCase Running org.apache.flink.ml.recommendation.ALSITCase Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.782 sec - in org.apache.flink.ml.feature.PolynomialBaseITCase Running org.apache.flink.ml.regression.MultipleLinearRegressionITCase Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 18.515 sec - in org.apache.flink.ml.recommendation.ALSITCase Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 11.263 sec <<< FAILURE! - in org.apache.flink.ml.regression.MultipleLinearRegressionITCase testEstimationOfCubicFunction(org.apache.flink.ml.regression.MultipleLinearRegressionITCase) Time elapsed: 1.839 sec <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:295) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:89) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: java.lang.NullPointerException at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:182) at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.finish(PipelinedSubpartition.java:116) at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:261) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:220) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException: null at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:190) at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:179) at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.finish(PipelinedSubpartition.java:116) at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:261) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:220) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1754) Deadlock in job execution
Sebastian Kruse created FLINK-1754: -- Summary: Deadlock in job execution Key: FLINK-1754 URL: https://issues.apache.org/jira/browse/FLINK-1754 Project: Flink Issue Type: Bug Reporter: Sebastian Kruse I have encountered a reproducible deadlock in the execution of one of my jobs. The part of the plan, where this happens, is the following: {code:java} /** Performs the reduction via creating transitive INDs and removing them from the original IND set. */ private DataSet> calculateTransitiveReduction1(DataSet> inclusionDependencies) { // Concatenate INDs (only one hop). DataSet> transitiveInds = inclusionDependencies .flatMap(new SplitInds()) .joinWithTiny(inclusionDependencies) .where(1).equalTo(0) .with(new ConcatenateInds()); // Remove the concatenated INDs to come up with a transitive reduction of the INDs. return inclusionDependencies .coGroup(transitiveInds) .where(0).equalTo(0) .with(new RemoveTransitiveInds()); } {code} Seemingly, the flatmap operator waits infinitely for a free buffer to write on. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1753) Add more tests for Kafka Connectors
Robert Metzger created FLINK-1753: - Summary: Add more tests for Kafka Connectors Key: FLINK-1753 URL: https://issues.apache.org/jira/browse/FLINK-1753 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.9 Reporter: Robert Metzger The current {{KafkaITCase}} is only doing a single test. We need to refactor that test so that it brings up a Kafka/Zookeeper server and than performs various tests: Tests to include: - A topology with non-string types - A topology with a custom Kafka Partitioning class -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1752) Add test to Kafka streaming connector
Robert Metzger created FLINK-1752: - Summary: Add test to Kafka streaming connector Key: FLINK-1752 URL: https://issues.apache.org/jira/browse/FLINK-1752 Project: Flink Issue Type: Improvement Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Add a test for Flink Streaming's Kafka connector using an integrated Kafka and Zookeeper server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1751) Update CLI documentation page
Maximilian Michels created FLINK-1751: - Summary: Update CLI documentation page Key: FLINK-1751 URL: https://issues.apache.org/jira/browse/FLINK-1751 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 0.9 Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Trivial Fix For: 0.9 http://ci.apache.org/projects/flink/flink-docs-master/cli.html The command-line options have been changed. This should be reflected in the documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Submitting small PRs rather than massive ones
Great suggestion and observation Max. +1 Yes, we should also splitting PRs into right and logical commits that will definitely help with review. Like you said some PRs are just large in nature and splitting it into pieces may not work well so doing right commits should go hand in hand with small PRs as necessary. When I mean by smaller PRs is to make sure one PR try to solve one logical issue at a time. If we can split it into smaller PRs, by designing your solution propoerly, it would help history management better. Think of it as one agile sprint story =) - Henry On Thu, Mar 19, 2015 at 2:27 AM, Maximilian Michels wrote: > I agree with you, Henry. Reviewing hundreds of changes class files is a > difficult and a nearly impossible task to do exhaustive. > > However, splitting up pull requests also has some drawbacks. For example, > discussions and comments are also split up and harder to keep up with. > Also, pull requests might depend on other pull requests. > > Therefore, I would advise to make use of Gits power and split up pull > requests into as many logical commits as possible. Individual commits can > be reviewed just like individual pull requests. The advantage being that > they can build on each other. > > Max > > On Thu, Mar 19, 2015 at 10:17 AM, Ufuk Celebi wrote: > >> >> On 19 Mar 2015, at 09:43, Stephan Ewen wrote: >> >> > I like this proposal very much. We should do that as much as possible. >> >> Same here. Makes it also easier to track progress. >> >> (I think this should go hand in hand with better design descriptions in >> the corresponding JIRAs.)
Re: [DISCUSS] Submitting small PRs rather than massive ones
+1 to that, Ufuk. Making JIRA more descriptive and contain design would make it better to review b4 jumping to the diff in the PRs. On Thu, Mar 19, 2015 at 2:17 AM, Ufuk Celebi wrote: > > On 19 Mar 2015, at 09:43, Stephan Ewen wrote: > >> I like this proposal very much. We should do that as much as possible. > > Same here. Makes it also easier to track progress. > > (I think this should go hand in hand with better design descriptions in the > corresponding JIRAs.)
Re: [DISCUSS] Submitting small PRs rather than massive ones
Yeah, renaming totally will contain large file changes. On Thu, Mar 19, 2015 at 1:43 AM, Stephan Ewen wrote: > I like this proposal very much. We should do that as much as possible. > > Pull requests with renaming easily add up to many files, it is harder there. > Am 18.03.2015 19:39 schrieb "Henry Saputra" : > >> Hi All, >> >> Recently there have been some PRs with massive changes which include >> multiple JIRA tickets. >> >> It is getting tougher to review and also to back port changes if needed. >> >> To help reviewers to help review the changes lets try to submit small >> but often PRs to make it easier to review. >> Not to mention Github UI suffers with diff changes over 200 files and >> thousands lines of code changes =) >> >> When committing to ASF git it should be fine to combine one day of >> work but PRs should as isolated as possible. >> >> Exception such as new module like Gelly or ML maybe ok, but others >> that require changes to the execution flow should be done if smaller >> batches if possible. >> >> Thanks, >> >> Henry >>
Re: Overview of Memory Management in Flink
We should have all parts of Flink documented that well. Very nice overview, Stephan! Two comments: Considering the lazy memory segment allocation. Besides, the problem of initial garbage collection of these dynamically allocated segments, wouldn't it also have the disadvantage that we could not guarantee a fixed amount of memory? How would we check that the user memory does not take away the managed memory? I would add a note to why java.misc.Unsafe is faster for longs. I would guess because of the coding of the long (little or big endian). Best, Max On Wed, Mar 18, 2015 at 5:21 PM, Stephan Ewen wrote: > Hi all! > > Here is a first version of the documentation how memory management works in > Flink. > > I hope it sheds some light on the magic we do. Let me know if certain > sections are still confusing. > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 > > Greetings, > Stephan >
Re: Queries regarding RDFs with Flink
Hi to all, I'm back to this task again :) Summarizing again: I have some source dataset that has contains RDF "stars" (SubjectURI, RdfType and a list of RDF triples belonging to this subject -> the "a.k.a." star schema) and I have to extract some sub-graphs for some RDF types of interest. As described in the previous email I'd like to expand some root node (if its type is of interest) and explode some of its path(s). For example, if I'm interested in the expansion of rdf type Person (as in the example), I could want to create a mini-graph with all of its triples plus those obtained exploding the path(s) knows.marriedWith and knows.knows.knows. At the moment I do it with a punctual get from HBase but I didn't get whether this could be done more efficiently with other strategies in Flink. @Vasiliki: you said that I could need "something like a BFS from each vertex". Do you have an example that could fit my use case? Is it possible to filter out those vertices I'm interested in? Thanks in advance, Flavio On Tue, Mar 3, 2015 at 8:32 PM, Vasiliki Kalavri wrote: > Hi Flavio, > > if you want to use Gelly to model your data as a graph, you can load your > Tuple3s as Edges. > This will result in "http://test/John";, "Person", "Frank", etc to be > vertices and "type", "name", "knows" to be edge values. > In the first case, you can use filterOnEdges() to get the subgraph with the > relation edges. > > Once you have the graph, you could probably use a vertex-centric iteration > to generate the trees. > It seems to me that you need something like a BFS from each vertex. Keep in > mind that this can be a very costly operation in terms of memory and > communication for large graphs. > > Let me know if you have any questions! > > Cheers, > V. > > On 3 March 2015 at 09:13, Flavio Pompermaier wrote: > > > I have a nice case of RDF manipulation :) > > Let's say I have the following RDF triples (Tuple3) in two files or > tables: > > > > TABLE A: > > http://test/John, type, Person > > http://test/John, name, John > > http://test/John, knows, http://test/Mary > > http://test/John, knows, http://test/Jerry > > http://test/Jerry, type, Person > > http://test/Jerry, name, Jerry > > http://test/Jerry, knows, http://test/Frank > > http://test/Mary, type, Person > > http://test/Mary, name, Mary > > > > TABLE B: > > http://test/Frank, type, Person > > http://test/Frank, name, Frank > > http://test/Frank, marriedWith, http://test/Mary > > > > What is the best way to build up Person-rooted trees with all node's data > > properties and some expanded path like 'Person.knows.marriedWith' ? > > Is it better to use Graph/Gelly APIs, Flink Joins, multiple punctuals get > > from a Key/value store or what? > > > > The expected 4 trees should be: > > > > tree 1 (root is John) -- > > http://test/John, type, Person > > http://test/John, name, John > > http://test/John, knows, http://test/Mary > > http://test/John, knows, http://test/Jerry > > http://test/Jerry, type, Person > > http://test/Jerry, name, Jerry > > http://test/Jerry, knows, http://test/Frank > > http://test/Mary, type, Person > > http://test/Mary, name, Mary > > http://test/Frank, type, Person > > http://test/Frank, name, Frank > > http://test/Frank, marriedWith, http://test/Mary > > > > tree 2 (root is Jerry) -- > > http://test/Jerry, type, Person > > http://test/Jerry, name, Jerry > > http://test/Jerry, knows, http://test/Frank > > http://test/Frank, type, Person > > http://test/Frank, name, Frank > > http://test/Frank, marriedWith, http://test/Mary > > http://test/Mary, type, Person > > http://test/Mary, name, Mary > > > > tree 3 (root is Mary) -- > > http://test/Mary, type, Person > > http://test/Mary, name, Mary > > > > tree 4 (root is Frank) -- > > http://test/Frank, type, Person > > http://test/Frank, name, Frank > > http://test/Frank, marriedWith, http://test/Mary > > http://test/Mary, type, Person > > http://test/Mary, name, Mary > > > > Thanks in advance, > > Flavio > > > > On Mon, Mar 2, 2015 at 5:04 PM, Stephan Ewen wrote: > > > > > Hey Santosh! > > > > > > RDF processing often involves either joins, or graph-query like > > operations > > > (transitive). Flink is fairly good at both types of operations. > > > > > > I would look into the graph examples and the graph API for a start: > > > > > > - Graph examples: > > > > > > > > > https://github.com/apache/flink/tree/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph > > > - Graph API: > > > > > > > > > https://github.com/apache/flink/tree/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph > > > > > > If you have a more specific question, I can give you better pointers > ;-) > > > > > > Stephan > > > > > > > > > On Fri, Feb 27, 2015 at 4:48 PM, santosh_rajaguru > > > wrote: > > > > > > > Hello, > > > > > > > > how can flink be useful for processing the data to RDFs and build the > > > > on
[jira] [Created] (FLINK-1750) Add canonical correlation analysis (CCA) to machine learning library
Till Rohrmann created FLINK-1750: Summary: Add canonical correlation analysis (CCA) to machine learning library Key: FLINK-1750 URL: https://issues.apache.org/jira/browse/FLINK-1750 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Canonical correlation analysis (CCA) [1] can be used to find correlated features between two random variables. Moreover, CCA can be used for dimensionality reduction. Maybe the work of Jia Chen and Ioannis D. Schizas [2] can be adapted to realize a distributed CCA with Flink. Resources: [1] [http://en.wikipedia.org/wiki/Canonical_correlation] [2] [http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=6810359] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library
Till Rohrmann created FLINK-1749: Summary: Add Boosting algorithm for ensemble learning to machine learning library Key: FLINK-1749 URL: https://issues.apache.org/jira/browse/FLINK-1749 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Boosting [1] can help to create strong learners from an ensemble of weak learners and thus improving its performance. Widely used boosting algorithms are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] investigates how boosting can be efficiently realised in a distributed setting. Resources: [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29] [2] [http://en.wikipedia.org/wiki/AdaBoost] [3] [http://en.wikipedia.org/wiki/LogitBoost] [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1748) Integrate PageRank implementation into machine learning library
Till Rohrmann created FLINK-1748: Summary: Integrate PageRank implementation into machine learning library Key: FLINK-1748 URL: https://issues.apache.org/jira/browse/FLINK-1748 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann We already have an excellent approximative PageRank [1] implementation which has been contributed by [~StephanEwen]. Making this implementation part of the machine learning library would be a great addition. Resources: [1] [http://en.wikipedia.org/wiki/PageRank] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1747) Remove deadlock detection and pipeline breaker placement in optimizer
Ufuk Celebi created FLINK-1747: -- Summary: Remove deadlock detection and pipeline breaker placement in optimizer Key: FLINK-1747 URL: https://issues.apache.org/jira/browse/FLINK-1747 Project: Flink Issue Type: Improvement Components: Optimizer Affects Versions: master Reporter: Ufuk Celebi Priority: Minor The deadlock detection in the optimizer, which places pipeline breaking caches has become redundant with recently added changes. We now use blocking data exchanges for branching programs, which are merged again at a later point. Therefore, we can start removing the respective code in the optimizer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1746) Add linear discriminant analysis to machine learning library
Till Rohrmann created FLINK-1746: Summary: Add linear discriminant analysis to machine learning library Key: FLINK-1746 URL: https://issues.apache.org/jira/browse/FLINK-1746 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Linear discriminant analysis (LDA) [1] is used for dimensionality reduction prior to classification. But it can also be used to calculate a linear classifier on its own. Since dimensionality reduction is an important preprocessing step, a distributed LDA implementation is a valuable addition to flink-ml. Resources: [1] [http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=5946724] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1745) Add k-nearest-neighbours algorithm to machine learning library
Till Rohrmann created FLINK-1745: Summary: Add k-nearest-neighbours algorithm to machine learning library Key: FLINK-1745 URL: https://issues.apache.org/jira/browse/FLINK-1745 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial it is still used as a mean to classify data and to do regression. Could be a starter task. Resources: [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Submitting small PRs rather than massive ones
I agree with you, Henry. Reviewing hundreds of changes class files is a difficult and a nearly impossible task to do exhaustive. However, splitting up pull requests also has some drawbacks. For example, discussions and comments are also split up and harder to keep up with. Also, pull requests might depend on other pull requests. Therefore, I would advise to make use of Gits power and split up pull requests into as many logical commits as possible. Individual commits can be reviewed just like individual pull requests. The advantage being that they can build on each other. Max On Thu, Mar 19, 2015 at 10:17 AM, Ufuk Celebi wrote: > > On 19 Mar 2015, at 09:43, Stephan Ewen wrote: > > > I like this proposal very much. We should do that as much as possible. > > Same here. Makes it also easier to track progress. > > (I think this should go hand in hand with better design descriptions in > the corresponding JIRAs.)
Re: [DISCUSS] Submitting small PRs rather than massive ones
On 19 Mar 2015, at 09:43, Stephan Ewen wrote: > I like this proposal very much. We should do that as much as possible. Same here. Makes it also easier to track progress. (I think this should go hand in hand with better design descriptions in the corresponding JIRAs.)
Re: [DISCUSS] Submitting small PRs rather than massive ones
I like this proposal very much. We should do that as much as possible. Pull requests with renaming easily add up to many files, it is harder there. Am 18.03.2015 19:39 schrieb "Henry Saputra" : > Hi All, > > Recently there have been some PRs with massive changes which include > multiple JIRA tickets. > > It is getting tougher to review and also to back port changes if needed. > > To help reviewers to help review the changes lets try to submit small > but often PRs to make it easier to review. > Not to mention Github UI suffers with diff changes over 200 files and > thousands lines of code changes =) > > When committing to ASF git it should be fine to combine one day of > work but PRs should as isolated as possible. > > Exception such as new module like Gelly or ML maybe ok, but others > that require changes to the execution flow should be done if smaller > batches if possible. > > Thanks, > > Henry >
Re: [Delta Iterations] The dirty insides(insights)
Did you send an empty email to user-subscr...@flink.apache.org ? That should subscribe you. On Thu, Mar 19, 2015 at 9:25 AM, Andra Lungu wrote: > Hello, > > I've used delta iterations several times up until now, but I just realized > that I never fully understood what happens inside. And the documentation > only explains things from a user's perspective. Which is why I could really > use your help :). > > Here goes nothing: > In Gelly, Graph.java, there is a nice runVertexCentricIteration(...) > method, which is in fact Spargel's - that disguises a delta iteration. What > I am trying to do is to set the vertex value before running > > DataSet> newVertices = > verticesWithInDegrees.runOperation(iteration); > > The problem is that after this runOperation, the vertex values get reset. > Now, when I looked in VertexCentricIteration.java's createResult(). > > It's a plain delta iteration that (more or less) looks like this: > > final DeltaIteration, Vertex VertexValue>> iteration = >this.initialVertices.iterateDelta(this.initialVertices, > this.maximumNumberOfIterations, zeroKeyPos); > > > > // configure coGroup update function with name and broadcast variables > updates = updates.name("Vertex State Updates"); > for (Tuple2> e : this.bcVarsUpdate) { >updates = updates.withBroadcastSet(e.f1, e.f0); > } > > // let the operator know that we preserve the key field > updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); > > return iteration.closeWith(updates, updates); > > The > > DataSet > > in the for is the problem. Vertex values get reset. Can someone give me a > hint on how to propagate the vertex value throughout the iteration? > > Thanks! > Andra > > > P.S. Could someone please subscribe me to the @user mailing list? For some > reason, sending the mail to that recipient fails. Thanks! >
[Delta Iterations] The dirty insides(insights)
Hello, I've used delta iterations several times up until now, but I just realized that I never fully understood what happens inside. And the documentation only explains things from a user's perspective. Which is why I could really use your help :). Here goes nothing: In Gelly, Graph.java, there is a nice runVertexCentricIteration(...) method, which is in fact Spargel's - that disguises a delta iteration. What I am trying to do is to set the vertex value before running DataSet> newVertices = verticesWithInDegrees.runOperation(iteration); The problem is that after this runOperation, the vertex values get reset. Now, when I looked in VertexCentricIteration.java's createResult(). It's a plain delta iteration that (more or less) looks like this: final DeltaIteration, Vertex> iteration = this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos); // configure coGroup update function with name and broadcast variables updates = updates.name("Vertex State Updates"); for (Tuple2> e : this.bcVarsUpdate) { updates = updates.withBroadcastSet(e.f1, e.f0); } // let the operator know that we preserve the key field updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); return iteration.closeWith(updates, updates); The DataSet in the for is the problem. Vertex values get reset. Can someone give me a hint on how to propagate the vertex value throughout the iteration? Thanks! Andra P.S. Could someone please subscribe me to the @user mailing list? For some reason, sending the mail to that recipient fails. Thanks!