[jira] [Commented] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant
[ https://issues.apache.org/jira/browse/FLINK-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15168808#comment-15168808 ] Gabor Gevay commented on FLINK-3519: So you mean that you are in favor of 2? > Subclasses of Tuples don't work if the declared type of a DataSet is not the > descendant > --- > > Key: FLINK-3519 > URL: https://issues.apache.org/jira/browse/FLINK-3519 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Minor > > If I have a subclass of TupleN, then objects of this type will turn into > TupleNs when I try to use them in a DataSet. > For example, if I have a class like this: > {code} > public static class Foo extends Tuple1 { > public short a; > public Foo() {} > public Foo(int f0, int a) { > this.f0 = f0; > this.a = (short)a; > } > @Override > public String toString() { > return "(" + f0 + ", " + a + ")"; > } > } > {code} > And then I do this: > {code} > env.fromElements(0,0,0).map(new MapFunction() { > @Override > public Tuple1 map(Integer value) throws Exception { > return new Foo(5, 6); > } > }).print(); > {code} > Then I don't have Foos in the output, but only Tuples: > {code} > (5) > (5) > (5) > {code} > The problem is caused by the TupleSerializer not caring about subclasses at > all. I guess the reason for this is performance: we don't want to deal with > writing and reading subclass tags when we have Tuples. > I see three options for solving this: > 1. Add subclass tags to the TupleSerializer: This is not really an option, > because we don't want to loose performance. > 2. Document this behavior in the javadoc of the Tuple classes. > 3. Make the Tuple types final: this would be the clean solution, but it is > API breaking, and the first victim would be Gelly: the Vertex and Edge types > extend from tuples. (Note that the issue doesn't appear there, because the > DataSets there always have the type of the descendant class.) > When deciding between 2. and 3., an important point to note is that if you > have your class extend from a Tuple type instead of just adding the f0, f1, > ... fields manually in the hopes of getting the performance boost associated > with Tuples, then you are out of luck: the PojoSerializer will kick in anyway > when the declared types of your DataSets are the descendant type. > If someone knows about a good reason to extend from a Tuple class, then > please comment. > For 2., this is a suggested wording for the javadoc of the Tuple classes: > Warning: Please don't subclass Tuple classes, but if you do, then be sure to > always declare the element type of your DataSets to your descendant type. > (That is, if you have a "class A extends Tuple2", then don't use instances of > A in a DataSet, but use DataSet.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant
[ https://issues.apache.org/jira/browse/FLINK-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3519: --- Affects Version/s: 1.0.0 > Subclasses of Tuples don't work if the declared type of a DataSet is not the > descendant > --- > > Key: FLINK-3519 > URL: https://issues.apache.org/jira/browse/FLINK-3519 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Minor > > If I have a subclass of TupleN, then objects of this type will turn into > TupleNs when I try to use them in a DataSet. > For example, if I have a class like this: > {code} > public static class Foo extends Tuple1 { > public short a; > public Foo() {} > public Foo(int f0, int a) { > this.f0 = f0; > this.a = (short)a; > } > @Override > public String toString() { > return "(" + f0 + ", " + a + ")"; > } > } > {code} > And then I do this: > {code} > env.fromElements(0,0,0).map(new MapFunction() { > @Override > public Tuple1 map(Integer value) throws Exception { > return new Foo(5, 6); > } > }).print(); > {code} > Then I don't have Foos in the output, but only Tuples: > {code} > (5) > (5) > (5) > {code} > The problem is caused by the TupleSerializer not caring about subclasses at > all. I guess the reason for this is performance: we don't want to deal with > writing and reading subclass tags when we have Tuples. > I see three options for solving this: > 1. Add subclass tags to the TupleSerializer: This is not really an option, > because we don't want to loose performance. > 2. Document this behavior in the javadoc of the Tuple classes. > 3. Make the Tuple types final: this would be the clean solution, but it is > API breaking, and the first victim would be Gelly: the Vertex and Edge types > extend from tuples. (Note that the issue doesn't appear there, because the > DataSets there always have the type of the descendant class.) > When deciding between 2. and 3., an important point to note is that if you > have your class extend from a Tuple type instead of just adding the f0, f1, > ... fields manually in the hopes of getting the performance boost associated > with Tuples, then you are out of luck: the PojoSerializer will kick in anyway > when the declared types of your DataSets are the descendant type. > If someone knows about a good reason to extend from a Tuple class, then > please comment. > For 2., this is a suggested wording for the javadoc of the Tuple classes: > Warning: Please don't subclass Tuple classes, but if you do, then be sure to > always declare the element type of your DataSets to your descendant type. > (That is, if you have a "class A extends Tuple2", then don't use instances of > A in a DataSet, but use DataSet.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant
Gabor Gevay created FLINK-3519: -- Summary: Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant Key: FLINK-3519 URL: https://issues.apache.org/jira/browse/FLINK-3519 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Gabor Gevay Priority: Minor If I have a subclass of TupleN, then objects of this type will turn into TupleNs when I try to use them in a DataSet. For example, if I have a class like this: {code} public static class Foo extends Tuple1 { public short a; public Foo() {} public Foo(int f0, int a) { this.f0 = f0; this.a = (short)a; } @Override public String toString() { return "(" + f0 + ", " + a + ")"; } } {code} And then I do this: {code} env.fromElements(0,0,0).map(new MapFunction() { @Override public Tuple1 map(Integer value) throws Exception { return new Foo(5, 6); } }).print(); {code} Then I don't have Foos in the output, but only Tuples: {code} (5) (5) (5) {code} The problem is caused by the TupleSerializer not caring about subclasses at all. I guess the reason for this is performance: we don't want to deal with writing and reading subclass tags when we have Tuples. I see three options for solving this: 1. Add subclass tags to the TupleSerializer: This is not really an option, because we don't want to loose performance. 2. Document this behavior in the javadoc of the Tuple classes. 3. Make the Tuple types final: this would be the clean solution, but it is API breaking, and the first victim would be Gelly: the Vertex and Edge types extend from tuples. (Note that the issue doesn't appear there, because the DataSets there always have the type of the descendant class.) When deciding between 2. and 3., an important point to note is that if you have your class extend from a Tuple type instead of just adding the f0, f1, ... fields manually in the hopes of getting the performance boost associated with Tuples, then you are out of luck: the PojoSerializer will kick in anyway when the declared types of your DataSets are the descendant type. If someone knows about a good reason to extend from a Tuple class, then please comment. For 2., this is a suggested wording for the javadoc of the Tuple classes: Warning: Please don't subclass Tuple classes, but if you do, then be sure to always declare the element type of your DataSets to your descendant type. (That is, if you have a "class A extends Tuple2", then don't use instances of A in a DataSet, but use DataSet.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167415#comment-15167415 ] Gabor Gevay commented on FLINK-3322: It's making it even worse: false, 500m: 30s false, 5000m: 115s true, 500m: 8s true, 5000m: 10s > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167377#comment-15167377 ] Gabor Gevay commented on FLINK-3322: I have constructed a much simpler example to demonstrate the problem: ConnectedComponents on a graph that is an 1000 length path: 1->2, 2->3, 3->4, 4->5, ... 999->1000: https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc-2 The class to run is org.apache.flink.graph.example.ConnectedComponents. Try increasing the memory from eg. 500m to 5000m, and look at the difference when taskmanager.memory.preallocate is true and false (TaskManager.scala:1713). I measured the following times on my laptop: false, 500m: 14s false, 5000m: 115s true, 500m: 8s true, 5000m: 13s (I guess that the difference between the two runs where preallocate is true is due to the time it takes for the JVM to allocate the memory once, but this should also be checked that it isn't for some other unexpected reason.) So the bottom line is that the problem gets worse when there are more iterations. (We have 1001 iterations in the linked example.) > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3277) Use Value types in Gelly API
[ https://issues.apache.org/jira/browse/FLINK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152504#comment-15152504 ] Gabor Gevay commented on FLINK-3277: There is one extra indirection (pointer) when something is a CopyableValue, compared to when something is a primitive type like long. But this probably only has a very little performance effect. > Use Value types in Gelly API > > > Key: FLINK-3277 > URL: https://issues.apache.org/jira/browse/FLINK-3277 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > This would be a breaking change so the discussion needs to happen before the > 1.0.0 release. > I think it would benefit Flink to use {{Value}} types wherever possible. The > {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each > return {{DataSet>}}. Using {{Long}} creates a new heap object > for every serialization and deserialization. The mutable {{Value}} types do > not suffer from this issue when object reuse is enabled. > I lean towards a preference for conciseness in documentation and performance > in examples and APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3277) Use Value types in Gelly API
[ https://issues.apache.org/jira/browse/FLINK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152448#comment-15152448 ] Gabor Gevay commented on FLINK-3277: Ah, yes, you are right, this wouldn't work with Tuples. It would only work with POJOs having `long` fields. So, it seems that the current performance difference between Tuples and POJOs would likely get reversed after code generation in the serializers are introduced. > Use Value types in Gelly API > > > Key: FLINK-3277 > URL: https://issues.apache.org/jira/browse/FLINK-3277 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > This would be a breaking change so the discussion needs to happen before the > 1.0.0 release. > I think it would benefit Flink to use {{Value}} types wherever possible. The > {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each > return {{DataSet>}}. Using {{Long}} creates a new heap object > for every serialization and deserialization. The mutable {{Value}} types do > not suffer from this issue when object reuse is enabled. > I lean towards a preference for conciseness in documentation and performance > in examples and APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3277) Use Value types in Gelly API
[ https://issues.apache.org/jira/browse/FLINK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152407#comment-15152407 ] Gabor Gevay commented on FLINK-3277: I think that this issue that new heap objects are created when deserializing a Tuple/POJO with a long field, will have a clean solution when we will have code-generation for the serializers. Then fields of primitive types (like long) can be handled specially: the generated code won't call the fieldSerializer, but directly call readLong, and write the result into the field. > Use Value types in Gelly API > > > Key: FLINK-3277 > URL: https://issues.apache.org/jira/browse/FLINK-3277 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > This would be a breaking change so the discussion needs to happen before the > 1.0.0 release. > I think it would benefit Flink to use {{Value}} types wherever possible. The > {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each > return {{DataSet>}}. Using {{Long}} creates a new heap object > for every serialization and deserialization. The mutable {{Value}} types do > not suffer from this issue when object reuse is enabled. > I lean towards a preference for conciseness in documentation and performance > in examples and APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3322: --- Priority: Critical (was: Major) > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)
Gabor Gevay created FLINK-3394: -- Summary: Clear up the contract of MutableObjectIterator.next(reuse) Key: FLINK-3394 URL: https://issues.apache.org/jira/browse/FLINK-3394 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Gabor Gevay Priority: Critical {{MutableObjectIterator.next(reuse)}} has the following contract (according to [~StephanEwen]'s comment \[1\]): 1. The caller may not hold onto {{reuse}} any more 2. The iterator implementor may not hold onto the returned object any more. This should be documented in its javadoc (with "WARNING" so that people don't overlook it). Additionally, since this was a "secret contract" up to now, all the 270 usages of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A few that are suspicious at first glance, are in {{CrossDriver}}, {{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. \[1\] https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)
[ https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3394: --- Fix Version/s: 1.0.0 > Clear up the contract of MutableObjectIterator.next(reuse) > -- > > Key: FLINK-3394 > URL: https://issues.apache.org/jira/browse/FLINK-3394 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > {{MutableObjectIterator.next(reuse)}} has the following contract (according > to [~StephanEwen]'s comment \[1\]): > 1. The caller may not hold onto {{reuse}} any more > 2. The iterator implementor may not hold onto the returned object any more. > This should be documented in its javadoc (with "WARNING" so that people don't > overlook it). > Additionally, since this was a "secret contract" up to now, all the 270 > usages of {{MutableObjectIterator.next(reuse)}} should be checked for > violations. A few that are suspicious at first glance, are in > {{CrossDriver}}, {{UnionWithTempOperator}}, > {{MutableHashTable.ProbeIterator.next}}, > {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. > \[1\] > https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144755#comment-15144755 ] Gabor Gevay commented on FLINK-3291: {quote} The initial idea of a contract for MutableObjectiterator.next(reuse) was the following: 1. The caller may not hold onto reuse any more 2. The iterator implementor may not hold onto the returned object any more. Given that this was long ago (5 years probably, since I created that interface), I am pretty sure that contract is not obeyed everywhere. {quote} OK, this clears things up; thanks for chiming in, [~StephanEwen]! Then the things we should do are 1. Add this contract to the javadoc of `MutableObjectiterator.next(reuse)`. 2. Go with [~greghogan]'s solution \[1\] to fix the problem brought up by this Jira. 3. Check all calls to `MutableObjectiterator.next(reuse)`. (A few suspicious ones are in `CrossDriver`, `UnionWithTempOperator`, `MutableHashTable.ProbeIterator.next`, and `ReusingBuildFirstHashJoinIterator.callWithNextKey`.) {quote} Gabor Gevay, at this point are the changes to MergeIterator fixing a bug? Do you want to fix up and clarify the documentation for MutableObjectIterator and verify the implementing classes? {quote} My changes to `MergeIterator` become unnecessary if the contracts of `MutableObjectIterator.next(reuse)` are what Stephan said. I'll open a Jira for fixing the javadoc of `MutableObjectIterator.next(reuse)` to include the contract (and possibly also add this info to a new wiki page that will be linked from the new version of the object reuse documentation brewing at \[2\]), and check all call sites whether they obey the contract. \[1\] https://github.com/apache/flink/pull/1626 \[2\] https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)
[ https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3394: --- Description: {{MutableObjectIterator.next(reuse)}} has the following contract (according to [~StephanEwen]'s comment \[1\]): 1. The caller may not hold onto {{reuse}} any more 2. The iterator implementor may not hold onto the returned object any more. This should be documented in its javadoc (with "WARNING" so that people don't overlook it). Additionally, since this was a "secret contract" up to now, all the 270 usages of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A few that are suspicious at first glance, are in {{CrossDriver}}, {{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls in the reduce drivers are being fixed by https://github.com/apache/flink/pull/1626 ) \[1\] https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 was: {{MutableObjectIterator.next(reuse)}} has the following contract (according to [~StephanEwen]'s comment \[1\]): 1. The caller may not hold onto {{reuse}} any more 2. The iterator implementor may not hold onto the returned object any more. This should be documented in its javadoc (with "WARNING" so that people don't overlook it). Additionally, since this was a "secret contract" up to now, all the 270 usages of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A few that are suspicious at first glance, are in {{CrossDriver}}, {{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. \[1\] https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 > Clear up the contract of MutableObjectIterator.next(reuse) > -- > > Key: FLINK-3394 > URL: https://issues.apache.org/jira/browse/FLINK-3394 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > {{MutableObjectIterator.next(reuse)}} has the following contract (according > to [~StephanEwen]'s comment \[1\]): > 1. The caller may not hold onto {{reuse}} any more > 2. The iterator implementor may not hold onto the returned object any more. > This should be documented in its javadoc (with "WARNING" so that people don't > overlook it). > Additionally, since this was a "secret contract" up to now, all the 270 > usages of {{MutableObjectIterator.next(reuse)}} should be checked for > violations. A few that are suspicious at first glance, are in > {{CrossDriver}}, {{UnionWithTempOperator}}, > {{MutableHashTable.ProbeIterator.next}}, > {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls > in the reduce drivers are being fixed by > https://github.com/apache/flink/pull/1626 ) > \[1\] > https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-: --- Priority: Blocker (was: Critical) > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139018#comment-15139018 ] Gabor Gevay commented on FLINK-: I have raised the priority to Blocker, because it seems that there is no consensus about what should the contracts be regarding object reuse. See https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139219#comment-15139219 ] Gabor Gevay commented on FLINK-: OK, this would also be a clear improvement over the current documentation. However, this doesn't discuss the rules about whether I can modify output objects after returning them. This question is far from being trivial (as evidenced by the comment thread in the Google Doc). Do you think that this "edge case" is too insignificant to mention? > I think it is most important that the user documentation be clear and > concise, otherwise it won't be read or will discourage new users. This is why I think that the separation between the non-chained/chained cases should be left out of this. If we are aiming for simplicity here, then I really can't imagine a user meticulously checking whether his operator will be chained, and then writing different code based on this. Other minor issues: > In practice that means that a user function will always receive the same > object instance "it can happen that" should be inserted. See point C) in the "Concrete problems with the current documentation" section in the Google Doc. > User defined functions (like map() or groupReduce()) Should be "like MapFunction or GroupReduceFunction", to avoid confusing new users about what is a user defined function. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139435#comment-15139435 ] Gabor Gevay commented on FLINK-: One more thing: if we decide to go with including chaining, then please also explain that a "chainable operator" means that it can be chained with the _previous_ operator. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-: --- Priority: Major (was: Minor) > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
[ https://issues.apache.org/jira/browse/FLINK-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136812#comment-15136812 ] Gabor Gevay commented on FLINK-3266: These are two thread dumps, one before running the job, and one after: http://pastebin.com/aT9zMf2f http://pastebin.com/nLzicKad The diff between them are these threads: ForkJoinPool-4-worker-7 ForkJoinPool-1-worker-3 ForkJoinPool-3-worker-5 Timer-0 Timer-1 > LocalFlinkMiniCluster leaks resources when multiple jobs are submitted > -- > > Key: FLINK-3266 > URL: https://issues.apache.org/jira/browse/FLINK-3266 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Minor > > After a job submitted to a LocalEnvironment finishes, some threads are not > stopped, and are stuck in waiting forever. > You can observe this, if you enclose the body of the main function of the > WordCount example with a loop that executes 100 times, and monitor the thread > count (with VisualVM for example). > (The problem only happens if I use a mini cluster. If I use start-local.sh > and submit jobs to it, then there is no leak.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3363) JobManager does not shut down dedicated executor properly
[ https://issues.apache.org/jira/browse/FLINK-3363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136813#comment-15136813 ] Gabor Gevay commented on FLINK-3363: Is this a different issue from https://issues.apache.org/jira/browse/FLINK-3266 ? > JobManager does not shut down dedicated executor properly > - > > Key: FLINK-3363 > URL: https://issues.apache.org/jira/browse/FLINK-3363 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The JobManager does not shutdown its dedicated executor thread pool. > In standalone/Yarn setups, that is no issue. In local embedded execution, > this may leave threads running that are not needed any more. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-: --- Priority: Critical (was: Major) > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-: --- Component/s: Documentation > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134152#comment-15134152 ] Gabor Gevay commented on FLINK-3291: @Greg, my current problem is that I'm not sure what should be the exact contract for `MutableObjectIterator.next(reuse)`. The fix that I have so far \[1\] makes 3b "yes" (see \[2\]), but it seems that you would rather go for "no", so that it is possible to handle object reuse by "exchanging the given object for an existing object" as you say in your comment \[3\]. Do you think that the performance gain (avoiding a copy) makes the complications that arise at every use of `MutableObjectIterator.next(reuse)` worthwhile? \[1\] https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1 \[2\] https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit \[3\] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java#L70 > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3333) Documentation about object reuse should be improved
Gabor Gevay created FLINK-: -- Summary: Documentation about object reuse should be improved Key: FLINK- URL: https://issues.apache.org/jira/browse/FLINK- Project: Flink Issue Type: Bug Affects Versions: 1.0.0 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Fix For: 1.0.0 The documentation about object reuse \[1\] has several problems, see \[2\]. \[1\] https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior \[2\] https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3335) DataSourceTask object reuse when disabled
[ https://issues.apache.org/jira/browse/FLINK-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132453#comment-15132453 ] Gabor Gevay commented on FLINK-3335: Are you sure that adding that copy is the best solution here? An alternative solution would be to specify in the javadoc of InputFormat.nextRecord that it should satisfy the same contract that also governs how UDFs like MapFunction treat objects that they return or give to Collector.collect. (See here: https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit) > DataSourceTask object reuse when disabled > - > > Key: FLINK-3335 > URL: https://issues.apache.org/jira/browse/FLINK-3335 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > From {{DataSourceTask.invoke()}}: > {code} > if ((returned = format.nextRecord(serializer.createInstance())) != null) { > output.collect(returned); > } > {code} > The returned value ({{returned}}) must be copied rather than creating and > passing in a new instance. The {{InputFormat}} interface only permits the > given object to be used and does not require a new object to be returned > otherwise. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130452#comment-15130452 ] Gabor Gevay commented on FLINK-3291: OK, sorry, I think I have now understood what you meant by ReduceDriver.run not tracking the object returned from the iterator call. The problem here is that after 0a8df6d513fa59d650ff875bdf3a1613d0f14af5, I mustn't modify an object that I have given to an iterator.next call as a reuse object, because MergeIterator.HeadStream.nextHead saves a reference to it, and expects that object to not change. But this seems like a rather scary requirement, and I wouldn't be sure that some other code besides ReduceDriver somewhere doesn't also violate it. I think that the root cause of these issues, is that the documentation about object reuse [1] is rather inadequate in clearly stating what are the contracts in this area, so I tried to put together a Google Doc about this: [2]. Could you please look at it, and tell me how much it aligns with your way of thinking about object reuse? [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior [2] https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
Gabor Gevay created FLINK-3322: -- Summary: MemoryManager creates too much GC pressure with iterative jobs Key: FLINK-3322 URL: https://issues.apache.org/jira/browse/FLINK-3322 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Gabor Gevay When taskmanager.memory.preallocate is false (the default), released memory segments are not added to a pool, but the GC is expected to take care of them. This puts too much pressure on the GC with iterative jobs, where the operators reallocate all memory at every superstep. See the following discussion on the mailing list: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html Reproducing the issue: https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc The class to start is malom.Solver. If you increase the memory given to the JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. (It will generate some lookuptables to /tmp on first run for a few minutes.) (I think the slowdown might also depend somewhat on taskmanager.memory.fraction, because more unused non-managed memory results in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3321) TupleSerializerBase.getLength should know the length when all fields know it
[ https://issues.apache.org/jira/browse/FLINK-3321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3321: --- Labels: starter (was: ) > TupleSerializerBase.getLength should know the length when all fields know it > > > Key: FLINK-3321 > URL: https://issues.apache.org/jira/browse/FLINK-3321 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Gabor Gevay >Priority: Minor > Labels: starter > > TupleSerializerBase.getLength currently always returns -1, but it could > actually know the length, when all the field serializers know their lengths > (since no null can appear anywhere in Tuples, nor can a subclass of Tuple > with additional fields appear). > (The serializer knowing the exact size has various performance benefits, for > example see FixedLengthRecordSorter, or > CompactingHashTable.getInitialTableSize.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3322: --- Fix Version/s: 1.0.0 > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3321) TupleSerializerBase.getLength should know the length when all fields know it
Gabor Gevay created FLINK-3321: -- Summary: TupleSerializerBase.getLength should know the length when all fields know it Key: FLINK-3321 URL: https://issues.apache.org/jira/browse/FLINK-3321 Project: Flink Issue Type: Improvement Components: Java API Reporter: Gabor Gevay Priority: Minor TupleSerializerBase.getLength currently always returns -1, but it could actually know the length, when all the field serializers know their lengths (since no null can appear anywhere in Tuples, nor can a subclass of Tuple with additional fields appear). (The serializer knowing the exact size has various performance benefits, for example see FixedLengthRecordSorter, or CompactingHashTable.getInitialTableSize.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128243#comment-15128243 ] Gabor Gevay commented on FLINK-3291: > `ReduceDriver.run` owns two objects but is not tracking the object returned > in `while ((value = input.next(reuse2)) != null) {` The problem is more complicated than that: This can't be done with just 1 or 2 reuse objects, each of the head streams need to own objects themselves. > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128225#comment-15128225 ] Gabor Gevay commented on FLINK-3291: I'm almost done with the fix, see here: https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1 The only thing that is missing is adding a small test for specifically this problem. And I would like to additionally figure out why the test that are already in place didn't catch this problem. Btw. I couldn't avoid introducing an extra copy. (Th solution that I wrote in the Jira description, doesn't really work in itself.) > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128225#comment-15128225 ] Gabor Gevay edited comment on FLINK-3291 at 2/2/16 1:14 PM: I'm almost done with the fix, see here: https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1 The only thing that is missing is adding a small test for specifically this problem. And I would like to additionally figure out why the tests that are already in place didn't catch this problem. Btw. I couldn't avoid introducing an extra copy. (The solution that I wrote in the Jira description, doesn't really work in itself.) was (Author: ggevay): I'm almost done with the fix, see here: https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1 The only thing that is missing is adding a small test for specifically this problem. And I would like to additionally figure out why the test that are already in place didn't catch this problem. Btw. I couldn't avoid introducing an extra copy. (Th solution that I wrote in the Jira description, doesn't really work in itself.) > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
Gabor Gevay created FLINK-3291: -- Summary: Object reuse bug in MergeIterator.HeadStream.nextHead Key: FLINK-3291 URL: https://issues.apache.org/jira/browse/FLINK-3291 Project: Flink Issue Type: Bug Affects Versions: 1.0.0 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Critical MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the `reuse` object that it got as an argument. This object might be modified later by the caller. This actually happens when ReduceDriver.run calls input.next (which will actually be MergeIterator.next(E reuse)) in the inner while loop of the objectReuseEnabled branch, and that calls top.nextHead with the reference that it got from ReduceDriver, which erroneously saves the reference, and then ReduceDriver later uses that same object for doing the reduce. Another way in which this fails is when MergeIterator.next(E reuse) gives `reuse` to different `top`s in different calls, and then the heads end up being the same object. You can observe the latter situation in action by running ReducePerformance here: https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug Set memory to -Xmx200m (so that the MergeIterator actually has merging to do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then watch `reuse`, and the heads of the first two elements of `this.heap` in the debugger. They will get to be the same object after hitting continue about 6 times. You can also look at the count that is printed at the end, which shouldn't be larger than the key range. Also, if you look into the output file /tmp/xxxobjectreusebug, for example the key 77 appears twice. The good news is that I think I can see an easy fix that doesn't affect performance: MergeIterator.HeadStream could have a reuse object of its own as a member, and give that to iterator.next in nextHead(E reuse). And then we wouldn't need the overload of nextHead that has the reuse parameter, and MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
[ https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3291: --- Component/s: Distributed Runtime > Object reuse bug in MergeIterator.HeadStream.nextHead > - > > Key: FLINK-3291 > URL: https://issues.apache.org/jira/browse/FLINK-3291 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Critical > > MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the > `reuse` object that it got as an argument. This object might be modified > later by the caller. > This actually happens when ReduceDriver.run calls input.next (which will > actually be MergeIterator.next(E reuse)) in the inner while loop of the > objectReuseEnabled branch, and that calls top.nextHead with the reference > that it got from ReduceDriver, which erroneously saves the reference, and > then ReduceDriver later uses that same object for doing the reduce. > Another way in which this fails is when MergeIterator.next(E reuse) gives > `reuse` to different `top`s in different calls, and then the heads end up > being the same object. > You can observe the latter situation in action by running ReducePerformance > here: > https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug > Set memory to -Xmx200m (so that the MergeIterator actually has merging to > do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then > watch `reuse`, and the heads of the first two elements of `this.heap` in the > debugger. They will get to be the same object after hitting continue about 6 > times. > You can also look at the count that is printed at the end, which shouldn't be > larger than the key range. Also, if you look into the output file > /tmp/xxxobjectreusebug, for example the key 77 appears twice. > The good news is that I think I can see an easy fix that doesn't affect > performance: MergeIterator.HeadStream could have a reuse object of its own as > a member, and give that to iterator.next in nextHead(E reuse). And then we > wouldn't need the overload of nextHead that has the reuse parameter, and > MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2246) Add chained combine driver strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-2246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115174#comment-15115174 ] Gabor Gevay commented on FLINK-2246: I have added a commit to https://github.com/apache/flink/pull/1517 , which adds the chained version of ReduceCombineDriver. > Add chained combine driver strategy for ReduceFunction > -- > > Key: FLINK-2246 > URL: https://issues.apache.org/jira/browse/FLINK-2246 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.10.0 >Reporter: Ufuk Celebi >Assignee: Gabor Gevay >Priority: Minor > > Running the WordCount example with a text file input/output results and a > manual reduce function (instead of the sum(1)) results in a combiner, which > is not chained. > Replace sum(1) with the following to reproduce and use a text file as input: > {code} > fileOutput = true; > textPath = "..."; > outputPath = "..."; > {code} > {code} > .reduce(new ReduceFunction>() { > @Override > public Tuple2 reduce(Tuple2 value1, > Tuple2 value2) throws Exception { > return new Tuple2 (value1.f0, value1.f1 + value2.f1); > } > }); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2246) Add chained combine driver strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-2246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay reassigned FLINK-2246: -- Assignee: Gabor Gevay > Add chained combine driver strategy for ReduceFunction > -- > > Key: FLINK-2246 > URL: https://issues.apache.org/jira/browse/FLINK-2246 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.10.0 >Reporter: Ufuk Celebi >Assignee: Gabor Gevay >Priority: Minor > > Running the WordCount example with a text file input/output results and a > manual reduce function (instead of the sum(1)) results in a combiner, which > is not chained. > Replace sum(1) with the following to reproduce and use a text file as input: > {code} > fileOutput = true; > textPath = "..."; > outputPath = "..."; > {code} > {code} > .reduce(new ReduceFunction>() { > @Override > public Tuple2 reduce(Tuple2 value1, > Tuple2 value2) throws Exception { > return new Tuple2 (value1.f0, value1.f1 + value2.f1); > } > }); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
[ https://issues.apache.org/jira/browse/FLINK-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110349#comment-15110349 ] Gabor Gevay commented on FLINK-3266: They have names like `ForkJoinPool-168-worker-3` and `Timer-13`. Here is a thread dump: http://pastebin.com/h807AcHH > LocalFlinkMiniCluster leaks resources when multiple jobs are submitted > -- > > Key: FLINK-3266 > URL: https://issues.apache.org/jira/browse/FLINK-3266 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Minor > > After a job submitted to a LocalEnvironment finishes, some threads are not > stopped, and are stuck in waiting forever. > You can observe this, if you enclose the body of the main function of the > WordCount example with a loop that executes 100 times, and monitor the thread > count (with VisualVM for example). > (The problem only happens if I use a mini cluster. If I use start-local.sh > and submit jobs to it, then there is no leak.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
[ https://issues.apache.org/jira/browse/FLINK-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15108861#comment-15108861 ] Gabor Gevay commented on FLINK-3266: Threads staying alive are the most apparent, but there is some memory as well (maybe about 200 KB per job). > LocalFlinkMiniCluster leaks resources when multiple jobs are submitted > -- > > Key: FLINK-3266 > URL: https://issues.apache.org/jira/browse/FLINK-3266 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Minor > > After a job submitted to a LocalEnvironment finishes, some threads are not > stopped, and are stuck in waiting forever. > You can observe this, if you enclose the body of the main function of the > WordCount example with a loop that executes 100 times, and monitor the thread > count (with VisualVM for example). > (The problem only happens if I use a mini cluster. If I use start-local.sh > and submit jobs to it, then there is no leak.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
Gabor Gevay created FLINK-3266: -- Summary: LocalFlinkMiniCluster leaks resources when multiple jobs are submitted Key: FLINK-3266 URL: https://issues.apache.org/jira/browse/FLINK-3266 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.0.0 Reporter: Gabor Gevay Priority: Minor After a job submitted to a LocalEnvironment finishes, some threads are not stopped, and are stuck in waiting forever. You can observe this, if you enclose the body of the main function of the WordCount example with a loop that executes 100 times, and monitor the thread count (with VisualVM for example). (The problem only happens if I use a mini cluster. If I use start-local.sh and submit jobs to it, then there is no leak.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds
Gabor Gevay created FLINK-3166: -- Summary: The first program in ObjectReuseITCase has the wrong expected result, and it succeeds Key: FLINK-3166 URL: https://issues.apache.org/jira/browse/FLINK-3166 Project: Flink Issue Type: Bug Reporter: Gabor Gevay Priority: Critical The first program in ObjectReuseITCase has the following input: a,1 a,2 a,3 a,4 a,50 There is a groupBy on field 0, and then a reduce, so the result should be 1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the Flink program also produces this. The problem is caused my mismatched assumptions between ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about object reuse rules of ReduceFunctions: ReduceCombineDriver.sortAndCombine has the following comment: "The user function is expected to return the first input as the result." While the ReduceFunction in the test is modifying and returning the second input. (And the second program in the test also has the same problem.) I can't find the assumption that is stated in the comment in any documentation. For example, the javadoc of ReduceFunction should make the user aware of this. Or, alternatively, the code of the driver should be modified to not make this assumption. I'm not sure which solution is better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds
[ https://issues.apache.org/jira/browse/FLINK-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3166: --- Component/s: Tests > The first program in ObjectReuseITCase has the wrong expected result, and it > succeeds > - > > Key: FLINK-3166 > URL: https://issues.apache.org/jira/browse/FLINK-3166 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Gabor Gevay >Priority: Critical > > The first program in ObjectReuseITCase has the following input: > a,1 > a,2 > a,3 > a,4 > a,50 > There is a groupBy on field 0, and then a reduce, so the result should be > 1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the > Flink program also produces this. > The problem is caused my mismatched assumptions between > ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about > object reuse rules of ReduceFunctions: > ReduceCombineDriver.sortAndCombine has the following comment: > "The user function is expected to return the first input as the result." > While the ReduceFunction in the test is modifying and returning the second > input. (And the second program in the test also has the same problem.) > I can't find the assumption that is stated in the comment in any > documentation. For example, the javadoc of ReduceFunction should make the > user aware of this. Or, alternatively, the code of the driver should be > modified to not make this assumption. I'm not sure which solution is better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds
[ https://issues.apache.org/jira/browse/FLINK-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3166: --- Component/s: Documentation Distributed Runtime > The first program in ObjectReuseITCase has the wrong expected result, and it > succeeds > - > > Key: FLINK-3166 > URL: https://issues.apache.org/jira/browse/FLINK-3166 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime, Documentation, Tests >Reporter: Gabor Gevay >Priority: Critical > > The first program in ObjectReuseITCase has the following input: > a,1 > a,2 > a,3 > a,4 > a,50 > There is a groupBy on field 0, and then a reduce, so the result should be > 1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the > Flink program also produces this. > The problem is caused my mismatched assumptions between > ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about > object reuse rules of ReduceFunctions: > ReduceCombineDriver.sortAndCombine has the following comment: > "The user function is expected to return the first input as the result." > While the ReduceFunction in the test is modifying and returning the second > input. (And the second program in the test also has the same problem.) > I can't find the assumption that is stated in the comment in any > documentation. For example, the javadoc of ReduceFunction should make the > user aware of this. Or, alternatively, the code of the driver should be > modified to not make this assumption. I'm not sure which solution is better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15006330#comment-15006330 ] Gabor Gevay commented on FLINK-2662: Hi, Unfortunately, I haven't made any progress. It is still reproducible, I have now rebased to the current master, and the same thing happens when I try it on my home laptop. This is the rebased branch: https://github.com/ggevay/flink/tree/plan-generation-bug-rebased > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10 >Reporter: Gabor Gevay > Fix For: 0.10 > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2757) DataSinkTaskTest fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-2757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15002124#comment-15002124 ] Gabor Gevay commented on FLINK-2757: This might be related to https://issues.apache.org/jira/browse/FLINK-2369. > DataSinkTaskTest fails on Windows > - > > Key: FLINK-2757 > URL: https://issues.apache.org/jira/browse/FLINK-2757 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10 > Environment: Windows 10, Cygwin, "mvn clean install" >Reporter: Fabian Hueske > Fix For: 0.10 > > > The following tests of {{DataSinkTaskTest}} fail when calling {{mvn clean > install}} in Cygwin on Windows 10: > {code} > DataSinkTaskTest.testFailingDataSinkTask: Temp output file has not been > removed > DataSinkTaskTest.testFailingSortingDataSinkTask: Temp output file has not > been removed > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14978124#comment-14978124 ] Gabor Gevay commented on FLINK-2662: It is working on my work laptop... > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10 >Reporter: Gabor Gevay > Fix For: 0.10 > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14978675#comment-14978675 ] Gabor Gevay commented on FLINK-2662: Yes, because I checked out the branch that is referred here in both cases. > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10 >Reporter: Gabor Gevay > Fix For: 0.10 > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14976982#comment-14976982 ] Gabor Gevay commented on FLINK-2662: That's strange. I have also tried to reproduce this now, and it is still giving the same error for me. Do you have any idea as to what characteristic of my machine might influence the plan generation in a way that this error happens only for me? Tomorrow I will try this on my work laptop as well, and report back. > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10 >Reporter: Gabor Gevay > Fix For: 0.10 > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2847) Fix flaky test in StormTestBase.testJob
[ https://issues.apache.org/jira/browse/FLINK-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2847: --- Labels: test-stability (was: ) > Fix flaky test in StormTestBase.testJob > --- > > Key: FLINK-2847 > URL: https://issues.apache.org/jira/browse/FLINK-2847 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Henry Saputra >Priority: Minor > Labels: test-stability > > {code} > testJob(org.apache.flink.storm.wordcount.WordCountLocalITCase) Time elapsed: > 12.845 sec <<< FAILURE! > java.lang.AssertionError: Different number of lines in expected and obtained > result. expected:<801> but was:<0> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:555) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:305) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:291) > at > org.apache.flink.storm.wordcount.WordCountLocalITCase.postSubmit(WordCountLocalITCase.java:38) > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.815 sec - > in org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNamesITCase > Running org.apache.flink.storm.wordcount.BoltTokenizerWordCountITCase > Running org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojoITCase > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.55 sec - in > org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojoITCase > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.801 sec - > in org.apache.flink.storm.wordcount.BoltTokenizerWordCountITCase > Results : > Failed tests: > > WordCountLocalITCase>StormTestBase.testJob:98->postSubmit:38->TestBaseUtils.compareResultsByLinesInMemory:291->TestBaseUtils.compareResultsByLinesInMemory:305 > Different number of lines in expected and obtained result. expected:<801> > but was:<0> > Tests run: 11, Failures: 1, Errors: 0, Skipped: 0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2839) Failing test: OperatorStatsAccumulatorTest.testAccumulatorAllStatistics
Gabor Gevay created FLINK-2839: -- Summary: Failing test: OperatorStatsAccumulatorTest.testAccumulatorAllStatistics Key: FLINK-2839 URL: https://issues.apache.org/jira/browse/FLINK-2839 Project: Flink Issue Type: Bug Components: flink-contrib Reporter: Gabor Gevay Priority: Minor I saw this test failure: {code} Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 7.633 sec <<< FAILURE! - in org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest testAccumulatorAllStatistics(org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest) Time elapsed: 1.5 sec <<< FAILURE! java.lang.AssertionError: The total number of heavy hitters should be between 0 and 5. at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest.testAccumulatorAllStatistics(OperatorStatsAccumulatorTest.java:151) {code} Full log [here|https://s3.amazonaws.com/archive.travis-ci.org/jobs/84469788/log.txt]. Maybe the test should set a constant seed to the {{Random}} object. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2818) The *ReduceDriver classes have incorrect javadocs
Gabor Gevay created FLINK-2818: -- Summary: The *ReduceDriver classes have incorrect javadocs Key: FLINK-2818 URL: https://issues.apache.org/jira/browse/FLINK-2818 Project: Flink Issue Type: Bug Components: Distributed Runtime Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Trivial The javadocs of ReduceDriver, AllReduceDriver, AllGroupReduceDriver, GroupReduceDriver, GroupReduceCombineDriver, and GroupCombineChainedDriver are outdated and/or copy-pasted-then-not-correctly-modified, which makes deciphering what all these classes are more difficult. PR coming shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay reassigned FLINK-2237: -- Assignee: Gabor Gevay > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Assignee: Gabor Gevay >Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2809) DataSet[Unit] doesn't work
Gabor Gevay created FLINK-2809: -- Summary: DataSet[Unit] doesn't work Key: FLINK-2809 URL: https://issues.apache.org/jira/browse/FLINK-2809 Project: Flink Issue Type: Bug Components: Scala API Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The following code creates a DataSet\[Unit\]: val env = ExecutionEnvironment.createLocalEnvironment() val a = env.fromElements(1,2,3) val b = a.map (_ => ()) b.writeAsText("/tmp/xxx") env.execute() This doesn't work, because a VoidSerializer is created, which can't cope with a BoxedUnit. See exception below. I'm now thinking about creating a UnitSerializer class. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to java.lang.Void at org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2806) No TypeInfo for Scala's Nothing type
Gabor Gevay created FLINK-2806: -- Summary: No TypeInfo for Scala's Nothing type Key: FLINK-2806 URL: https://issues.apache.org/jira/browse/FLINK-2806 Project: Flink Issue Type: Bug Components: Scala API Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor When writing some generic code, I encountered a situation where I needed a TypeInformation[Nothing]. Two problems prevent me from getting it: 1. TypeInformationGen.mkTypeInfo doesn't return a real TypeInformation[Nothing]. (It actually returns a casted null in that case.) 2. The line implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] does not fire in some situations when it should, when T = Nothing. (I guess this is a compiler bug.) I will open a PR shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14909174#comment-14909174 ] Gabor Gevay commented on FLINK-2237: Couldn't the CompactingHashTable be reused here? The driver would get a prober from it, and then for each incoming record, the driver would call getMatchFor, then do one step of the reduce, and then write the result back with updateMatch. I'm interested in this feature because I have a computation where it would really make a huge difference: I'm calling flatMap on an already large data set, which blows it up to about 10 times the size, and then groupBy and reduce. If Flink had hash-based aggregation, then the result of the flatMap wouldn't need to be materialized, which would reduce the memory requirement to about 1/10. > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
Gabor Gevay created FLINK-2662: -- Summary: CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators." Key: FLINK-2662 URL: https://issues.apache.org/jira/browse/FLINK-2662 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: master Reporter: Gabor Gevay I have a Flink program which throws the exception in the jira title. Full text: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators. at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) at org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) at org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) at malom.Solver.main(Solver.java:66) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) The execution plan: http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt (I obtained this by commenting out the line that throws the exception) The code is here: https://github.com/ggevay/flink/tree/plan-generation-bug The class to run is "Solver". It needs a command line argument, which is a directory where it would write output. (On first run, it generates some lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2181) SessionWindowing example has a memleak
[ https://issues.apache.org/jira/browse/FLINK-2181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2181: --- Assignee: (was: Gabor Gevay) > SessionWindowing example has a memleak > -- > > Key: FLINK-2181 > URL: https://issues.apache.org/jira/browse/FLINK-2181 > Project: Flink > Issue Type: Bug > Components: Examples, Streaming >Reporter: Gabor Gevay > > The trigger policy objects belonging to already terminated sessions are kept > in memory, and also NotifyOnLastGlobalElement gets called on them. This > causes the program to eat up more and more memory, and also to get slower > with time. > Mailing list discussion: > http://mail-archives.apache.org/mod_mbox/flink-dev/201505.mbox/%3CCADXjeyBW9uaA=ayde+9r+qh85pblyavuegsr7h8pwkyntm9...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay resolved FLINK-2548. Resolution: Won't Fix In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708364#comment-14708364 ] Gabor Gevay commented on FLINK-2548: OK, you are probably right. I ran some more tests, and it seems that the issue in my use case is more with the serialization. In other cases, when the serialization of the vertex IDs is cheaper, then the coGroup implementation does OK with respect to the run time of one iteration following the workset size. In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2548: --- Summary: In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset (was: VertexCentricIteration should avoid doing a coGroup with the edges and the solution set) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706579#comment-14706579 ] Gabor Gevay commented on FLINK-2548: I changed the title of the issue to refer to the goal instead of the above proposed solution approach, because I'm not sure anymore that this is the best approach to achieve this. My problem with the join + reduceGroup approach is that there are cases where it is a little slower (few ten percent) then the coGroup, for example when the edges data set doesn't fit into memory, and probably also when the workset size is close to all the vertices. Another approach would be to extend the coGroup operation to have an option which makes it call the UDF only for the matching keys (inner join), and the optimizer would consider using a hashtable instead of sorting if this option is set. But I am not sure how big work would be to implement this. In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704730#comment-14704730 ] Gabor Gevay commented on FLINK-2548: {quote} CoGrouping with the solution set is a different runtime operator than the regular CoGroup. {quote} I see, thanks! I think this should be documented in the iterations programming guide, and in the javadoc for coGroup and getSolutionSet. {quote} (2) To have an iterator for the message sending, you can group by source vertex. Then you group again by target vertex, to have an iterator for incoming messages (either reduce+join or CoGroup). So you have now join + reduce + reduce + join in teh iteration? {quote} For calling the MessagingFunction, I join the edges with the workset, and then do a reduceGroup by source vertex. This gives me an iterator over the neighbors inside the UDF of the reduceGroup. (Well, almost. That iterator is a little different from what the messaging function expects, so I wrote an iterator adaptor.) See my code \[1\], and here is an execution plan which involves the new code: \[2\]. I think this code is memory-safe in the way you described. \[1\] https://github.com/apache/flink/compare/master...ggevay:gelly-remove-coGroup-first-version \[2\] http://compalg.inf.elte.hu/~ggevay/flink/Plan_join_groupBy.txt VertexCentricIteration should avoid doing a coGroup with the edges and the solution set --- Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14703423#comment-14703423 ] Gabor Gevay commented on FLINK-2548: {quote} Actually, the second co-group is not a real co-group. It only queries the solution set for the vertices that have a message. {quote} Can you please explain how does it achieve this? I can't understand looking at the code, how does it not get called on every vertex. {quote} We used coGroup initially, because it fits the pregel model where you have an iterator over your neighbors. {quote} I do a groupBy after the join, so I have the same iterator. {quote} This could be realized by a join as well, although it is hard to realize that in a memory-safe fashion. {quote} What exactly do you mean here by memory-safe? I see one drawback of the join-then-groupBy approach memory-wise: the workset vertex-value tuples get replicated that many times as the vertex's out degree. Did you mean this problem? {quote} Breaking this into three UDFs (Scatter / Gather / Apply), implemented as (Join, Reduce, Join) would work and give the efficiency you seek. The ConnectedComponents example follows pretty much that pattern. {quote} Thanks, I will look into this. VertexCentricIteration should avoid doing a coGroup with the edges and the solution set --- Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set
Gabor Gevay created FLINK-2548: -- Summary: VertexCentricIteration should avoid doing a coGroup with the edges and the solution set Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop
[ https://issues.apache.org/jira/browse/FLINK-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2540: --- Component/s: Core LocalBufferPool.requestBuffer gets into infinite loop - Key: FLINK-2540 URL: https://issues.apache.org/jira/browse/FLINK-2540 Project: Flink Issue Type: Bug Components: Core Reporter: Gabor Gevay I'm trying to run a complicated computation that looks like this: [1]. One of the DataSource-Filter-Map chains finishes fine, but the other one freezes. Debugging shows that it is spinning in the while loop in LocalBufferPool.requestBuffer. askToRecycle is false. Both numberOfRequestedMemorySegments and currentPoolSize is 128, so it never goes into that if either. This is a stack trace: [2] And here is the code, if you would like to run it: [3]. Unfortunately, I can't make it more minimal, becuase if I remove some operators, the problem disappears. The class to start is malom.Solver. (On first run, it calculates some lookuptables for a few minutes, and puts them into /tmp/movegen) [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt [3] https://github.com/ggevay/flink/tree/deadlock-malom -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop
Gabor Gevay created FLINK-2540: -- Summary: LocalBufferPool.requestBuffer gets into infinite loop Key: FLINK-2540 URL: https://issues.apache.org/jira/browse/FLINK-2540 Project: Flink Issue Type: Bug Reporter: Gabor Gevay I'm trying to run a complicated computation that looks like this: [1]. One of the DataSource-Filter-Map chains finishes fine, but the other one freezes. Debugging shows that it is spinning in the while loop in LocalBufferPool.requestBuffer. askToRecycle is false. Both numberOfRequestedMemorySegments and currentPoolSize is 128, so it never goes into that if either. This is a stack trace: [2] And here is the code, if you would like to run it: [3]. Unfortunately, I can't make it more minimal, becuase if I remove some operators, the problem disappears. The class to start is malom.Solver. (On first run, it calculates some lookuptables for a few minutes, and puts them into /tmp/movegen) [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt [3] https://github.com/ggevay/flink/tree/deadlock-malom -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop
[ https://issues.apache.org/jira/browse/FLINK-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2540: --- Fix Version/s: 0.9.1 0.10 LocalBufferPool.requestBuffer gets into infinite loop - Key: FLINK-2540 URL: https://issues.apache.org/jira/browse/FLINK-2540 Project: Flink Issue Type: Bug Components: Core Reporter: Gabor Gevay Fix For: 0.10, 0.9.1 I'm trying to run a complicated computation that looks like this: [1]. One of the DataSource-Filter-Map chains finishes fine, but the other one freezes. Debugging shows that it is spinning in the while loop in LocalBufferPool.requestBuffer. askToRecycle is false. Both numberOfRequestedMemorySegments and currentPoolSize is 128, so it never goes into that if either. This is a stack trace: [2] And here is the code, if you would like to run it: [3]. Unfortunately, I can't make it more minimal, becuase if I remove some operators, the problem disappears. The class to start is malom.Solver. (On first run, it calculates some lookuptables for a few minutes, and puts them into /tmp/movegen) [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt [3] https://github.com/ggevay/flink/tree/deadlock-malom -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2542) It should be documented that it is required from a join key to override hashCode(), when it is not a POJO
Gabor Gevay created FLINK-2542: -- Summary: It should be documented that it is required from a join key to override hashCode(), when it is not a POJO Key: FLINK-2542 URL: https://issues.apache.org/jira/browse/FLINK-2542 Project: Flink Issue Type: Bug Components: Gelly, Java API Reporter: Gabor Gevay Priority: Minor Fix For: 0.10, 0.9.1 If the join key is not a POJO, and does not override hashCode, then the join silently fails (produces empty output). I don't see this documented anywhere. The Gelly documentation should also have this info separately, because it does joins internally on the vertex IDs, but the user might not know this, or might not look at the join documentation when using Gelly. Here is an example code: {noformat} public static class ID implements ComparableID { public long foo; //no default ctor -- not a POJO public ID(long foo) { this.foo = foo; } @Override public int compareTo(ID o) { return ((Long)foo).compareTo(o.foo); } @Override public boolean equals(Object o0) { if(o0 instanceof ID) { ID o = (ID)o0; return foo == o.foo; } else { return false; } } @Override public int hashCode() { return 42; } } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSetTuple2ID, Long inDegrees = env.fromElements(Tuple2.of(new ID(123l), 4l)); DataSetTuple2ID, Long outDegrees = env.fromElements(Tuple2.of(new ID(123l), 5l)); DataSetTuple3ID, Long, Long degrees = inDegrees.join(outDegrees, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) .with(new FlatJoinFunctionTuple2ID, Long, Tuple2ID, Long, Tuple3ID, Long, Long() { @Override public void join(Tuple2ID, Long first, Tuple2ID, Long second, CollectorTuple3ID, Long, Long out) { out.collect(new Tuple3ID, Long, Long(first.f0, first.f1, second.f1)); } }).withForwardedFieldsFirst(f0;f1).withForwardedFieldsSecond(f1); System.out.println(degrees count: + degrees.count()); } {noformat} This prints 1, but if I comment out the hashCode, it prints 0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
Gabor Gevay created FLINK-2527: -- Summary: If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set Key: FLINK-2527 URL: https://issues.apache.org/jira/browse/FLINK-2527 Project: Flink Issue Type: Bug Components: Gelly Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 0.10, 0.9.1 The problem is that if setNewVertexValue is called more than once, it sends each new value to the out Collector, and these all end up in the workset, but then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only the first value in the state Iterable. I see three ways to resolve this: 1. Add it to the documentation that setNewVertexValue should only be called once, and optionally add a check for this. 2. In setNewVertexValue, do not send the newValue to the out Collector at once, but only record it in outVal, and send the last recorded value after updateVertex returns. 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some documentation addition.) I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
[ https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698423#comment-14698423 ] Gabor Gevay commented on FLINK-2527: I'm also leaning towards (1) now. I have actually implemented (2) in the meantime, but then I realized that it sets a subtle trap for the user, that I immediately fell into :) In my user function, I have a loop over the msgs, and for each msg I decide to set some new vertex value or not. This loop might set a new value multiple times, and the last one should be retained. At first, I liked (2) better, because if we have (1), then I essentially have to implement (2) inside my user function anyway. I thought that this situation is probably a common one, so why have everyone reimplement (2) inside their user functions, if we can do it in Gelly? However, the trap is that my code inside the loop implicitly assumed that the setNewVertexValue function updates the vertex variable (the first parameter of the UDF), but it does not. Of course, we could make the setNewVertexValue do this update, but this is getting complicated. So it is probably just best to go with (1), to keep the API nice and simple. If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set - Key: FLINK-2527 URL: https://issues.apache.org/jira/browse/FLINK-2527 Project: Flink Issue Type: Bug Components: Gelly Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 0.10, 0.9.1 The problem is that if setNewVertexValue is called more than once, it sends each new value to the out Collector, and these all end up in the workset, but then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only the first value in the state Iterable. I see three ways to resolve this: 1. Add it to the documentation that setNewVertexValue should only be called once, and optionally add a check for this. 2. In setNewVertexValue, do not send the newValue to the out Collector at once, but only record it in outVal, and send the last recorded value after updateVertex returns. 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some documentation addition.) I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
Gabor Gevay created FLINK-2447: -- Summary: TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Reporter: Gabor Gevay Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[ https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2447: --- Description: Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) was: Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[ https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2447: --- Component/s: Java API TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type --- Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Components: Java API Reporter: Gabor Gevay Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[ https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2447: --- Fix Version/s: 0.9.1 0.10 TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type --- Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Components: Java API Reporter: Gabor Gevay Fix For: 0.10, 0.9.1 Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2360) EOFException
[ https://issues.apache.org/jira/browse/FLINK-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2360: --- Fix Version/s: 0.9.1 0.10 EOFException Key: FLINK-2360 URL: https://issues.apache.org/jira/browse/FLINK-2360 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 0.10 Reporter: Andra Lungu Priority: Critical Fix For: 0.10, 0.9.1 The following code: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/NodeSplittingConnectedComponents.java What the code does, on a very high level: 1). Discovers the skewed nodes in a graph and splits them into subnodes, recursively, in levels until we achieve a more uniform degree distribution. 2). Creates a delta iteration that takes the split data set as a solution set. On this, it runs the Connected Components Algorithm. At the end of each superstep, the partial results computed by the subvertices is gathered back into the initial vertex, updating the overall value in the split vertices. 3). Once the iteration converged, the graph is brought back to its initial state. Ran on the twitter follower graph: http://twitter.mpi-sws.org/data-icwsm2010.html With a similar configuration to the one in FLINK-2293. Fails with: Caused by: java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173) at org.apache.flink.types.StringValue.writeString(StringValue.java:796) at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:63) at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) Job Manager log: https://gist.github.com/andralungu/9fc100603ba8d4b8d686 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2437) TypeExtractor.analyzePojo has some problems around the default constructor detection
Gabor Gevay created FLINK-2437: -- Summary: TypeExtractor.analyzePojo has some problems around the default constructor detection Key: FLINK-2437 URL: https://issues.apache.org/jira/browse/FLINK-2437 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor If a class does have a default constructor, but the user forgot to make it public, then TypeExtractor.analyzePojo still thinks everything is OK, so it creates a PojoTypeInfo. Then PojoSerializer.createInstance blows up. Furthermore, a return null seems to be missing from the then case of the if after catching the NoSuchMethodException which would also cause a headache for PojoSerializer. An additional minor issue is that the word class is printed twice in several places, because class.toString also prepends it to the class name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2437) TypeExtractor.analyzePojo has some problems around the default constructor detection
[ https://issues.apache.org/jira/browse/FLINK-2437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647915#comment-14647915 ] Gabor Gevay commented on FLINK-2437: I just realized that it is not a problem that analyzePojo lets abstract classes through, so the return null is not missing, sorry. TypeExtractor.analyzePojo has some problems around the default constructor detection Key: FLINK-2437 URL: https://issues.apache.org/jira/browse/FLINK-2437 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor If a class does have a default constructor, but the user forgot to make it public, then TypeExtractor.analyzePojo still thinks everything is OK, so it creates a PojoTypeInfo. Then PojoSerializer.createInstance blows up. Furthermore, a return null seems to be missing from the then case of the if after catching the NoSuchMethodException which would also cause a headache for PojoSerializer. An additional minor issue is that the word class is printed twice in several places, because class.toString also prepends it to the class name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633525#comment-14633525 ] Gabor Gevay commented on FLINK-2377: Yes, I have found the problem: TestBaseUtils.readAllResultLines is not closing the readers. I will open a PR that fixes this. AbstractTestBase.deleteAllTempFiles sometimes fails on Windows -- Key: FLINK-2377 URL: https://issues.apache.org/jira/browse/FLINK-2377 Project: Flink Issue Type: Bug Components: Tests Environment: Windows Reporter: Gabor Gevay Priority: Minor This is probably another file closing issue. (that is, Windows won't delete open files, as opposed to Linux) I have encountered two concrete tests so far where this actually appears: CsvOutputFormatITCase and CollectionSourceTest. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail
Gabor Gevay created FLINK-2374: -- Summary: File.setWritable doesn't work on Windows, which makes several tests fail Key: FLINK-2374 URL: https://issues.apache.org/jira/browse/FLINK-2374 Project: Flink Issue Type: Bug Environment: Windows Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Trivial The tests that use setWritable to test the handling of certain error conditions should be skipped in case we are under Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail
[ https://issues.apache.org/jira/browse/FLINK-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631270#comment-14631270 ] Gabor Gevay commented on FLINK-2374: Yes, it appears to be a duplicate, sorry. File.setWritable doesn't work on Windows, which makes several tests fail Key: FLINK-2374 URL: https://issues.apache.org/jira/browse/FLINK-2374 Project: Flink Issue Type: Bug Environment: Windows Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Trivial The tests that use setWritable to test the handling of certain error conditions should be skipped in case we are under Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail
[ https://issues.apache.org/jira/browse/FLINK-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay closed FLINK-2374. -- Resolution: Duplicate File.setWritable doesn't work on Windows, which makes several tests fail Key: FLINK-2374 URL: https://issues.apache.org/jira/browse/FLINK-2374 Project: Flink Issue Type: Bug Environment: Windows Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Trivial The tests that use setWritable to test the handling of certain error conditions should be skipped in case we are under Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2359) Add factory methods to the Java TupleX types
[ https://issues.apache.org/jira/browse/FLINK-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631386#comment-14631386 ] Gabor Gevay commented on FLINK-2359: You're welcome! Add factory methods to the Java TupleX types Key: FLINK-2359 URL: https://issues.apache.org/jira/browse/FLINK-2359 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Fix For: 0.10 The compiler doesn't infer generic type arguments from constructor arguments, which means that we have to call Tuple constructors like this: Tuple2Integer, String = new Tuple2Integer, String(5, foo); I propose adding a factory method, which would provide the following alternative: Tuple2Integer, String = Tuple2.create(5, foo); (Note that C++ and C# Tuples also have similar factory methods for the same reason.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2376) testFindConnectableAddress sometimes fails on Windows because of the time limit
Gabor Gevay created FLINK-2376: -- Summary: testFindConnectableAddress sometimes fails on Windows because of the time limit Key: FLINK-2376 URL: https://issues.apache.org/jira/browse/FLINK-2376 Project: Flink Issue Type: Bug Environment: Windows Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
Gabor Gevay created FLINK-2377: -- Summary: AbstractTestBase.deleteAllTempFiles sometimes fails on Windows Key: FLINK-2377 URL: https://issues.apache.org/jira/browse/FLINK-2377 Project: Flink Issue Type: Bug Components: Tests Environment: Windows Reporter: Gabor Gevay Priority: Minor This is probably another file closing issue. (that is, Windows won't delete open files, as opposed to Linux) I have encountered two concrete tests so far where this actually appears: CsvOutputFormatITCase and CollectionSourceTest. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2378) complexIntegrationTest1 and testGroupByFeedback sometimes fail on Windows
Gabor Gevay created FLINK-2378: -- Summary: complexIntegrationTest1 and testGroupByFeedback sometimes fail on Windows Key: FLINK-2378 URL: https://issues.apache.org/jira/browse/FLINK-2378 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gabor Gevay Priority: Minor This only happens when Maven runs the test, I can't trigger it from the IDE. At first I thought that something is wrong with having shared variables between the tests in ComplexIntegrationTest, but when I eliminated the shared variables, the test still fails. The error msgs: testGroupByFeedback(org.apache.flink.streaming.api.IterateTest) Time elapsed: 12.091 sec ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:169) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertTrue(Assert.java:52) at org.apache.flink.streaming.api.IterateTest$6.close(IterateTest.java:447) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeOperator(StreamTask.java:182) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:112) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.lang.Thread.run(Thread.java:745) complexIntegrationTest1(org.apache.flink.streaming.api.complex.ComplexIntegrationTest) Time elapsed: 15.989 sec FAILURE! java.lang.AssertionError: Different number of lines in expected and obtained result. expected:9 but was:5 at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:743) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:555) at org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:272) at org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:258) at org.apache.flink.streaming.api.complex.ComplexIntegrationTest.after(ComplexIntegrationTest.java:91) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2369) On Windows, in testFailingSortingDataSinkTask the temp file is not removed
Gabor Gevay created FLINK-2369: -- Summary: On Windows, in testFailingSortingDataSinkTask the temp file is not removed Key: FLINK-2369 URL: https://issues.apache.org/jira/browse/FLINK-2369 Project: Flink Issue Type: Bug Components: Distributed Runtime Environment: Windows 7 64-bit, JDK 8u51 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The test fails with the assert at the very end (Temp output file has not been removed). This happens because FileOutputFormat.tryCleanupOnError can't delete the file, because it is still open (note, that Linux happily deletes open files). A fix would be to have the this.format.close(); not just in the finally block (in DataSinkTask.invoke), but also before the tryCleanupOnError call (around line 217). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2359) Add factory methods to the Java TupleX types
[ https://issues.apache.org/jira/browse/FLINK-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626676#comment-14626676 ] Gabor Gevay commented on FLINK-2359: Oh, I didn't know about the diamond thing, thanks! Add factory methods to the Java TupleX types Key: FLINK-2359 URL: https://issues.apache.org/jira/browse/FLINK-2359 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The compiler doesn't infer generic type arguments from constructor arguments, which means that we have to call Tuple constructors like this: Tuple2Integer, String = new Tuple2Integer, String(5, foo); I propose adding a factory method, which would provide the following alternative: Tuple2Integer, String = Tuple2.create(5, foo); (Note that C++ and C# Tuples also have similar factory methods for the same reason.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2359) Add factory methods to the Java TupleX types
Gabor Gevay created FLINK-2359: -- Summary: Add factory methods to the Java TupleX types Key: FLINK-2359 URL: https://issues.apache.org/jira/browse/FLINK-2359 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The compiler doesn't infer generic type arguments from constructor arguments, which means that we have to call Tuple constructors like this: Tuple2Integer, String = new Tuple2Integer, String(5, foo); I propose adding a factory method, which would provide the following alternative: Tuple2Integer, String = Tuple2.create(5, foo); (Note that C++ and C# Tuples also have similar factory methods for the same reason.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2148) Approximately calculate the number of distinct elements of a stream
[ https://issues.apache.org/jira/browse/FLINK-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay reassigned FLINK-2148: -- Assignee: Gabor Gevay Approximately calculate the number of distinct elements of a stream --- Key: FLINK-2148 URL: https://issues.apache.org/jira/browse/FLINK-2148 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Labels: statistics In the paper http://people.seas.harvard.edu/~minilek/papers/f0.pdf Kane et al. describes an optimal algorithm for estimating the number of distinct elements in a data stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2255) In the TopSpeedWindowing examples, every window contains only 1 element, because event time is in millisec, but eviction is in sec
Gabor Gevay created FLINK-2255: -- Summary: In the TopSpeedWindowing examples, every window contains only 1 element, because event time is in millisec, but eviction is in sec Key: FLINK-2255 URL: https://issues.apache.org/jira/browse/FLINK-2255 Project: Flink Issue Type: Bug Components: Examples, Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The event times are generated by System.currentTimeMillis(), so evictionSec should be multiplied by 1000, when passing it to Time.of. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2181) SessionWindowing example has a memleak
Gabor Gevay created FLINK-2181: -- Summary: SessionWindowing example has a memleak Key: FLINK-2181 URL: https://issues.apache.org/jira/browse/FLINK-2181 Project: Flink Issue Type: Bug Components: Examples, Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay The trigger policy objects belonging to already terminated sessions are kept in memory, and also NotifyOnLastGlobalElement gets called on them. This causes the program to eat up more and more memory, and also to get slower with time. Mailing list discussion: http://mail-archives.apache.org/mod_mbox/flink-dev/201505.mbox/%3CCADXjeyBW9uaA=ayde+9r+qh85pblyavuegsr7h8pwkyntm9...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2147: --- Labels: statistics (was: ) Approximate calculation of frequencies in data streams -- Key: FLINK-2147 URL: https://issues.apache.org/jira/browse/FLINK-2147 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Priority: Minor Labels: statistics Count-Min sketch is a hashing-based algorithm for approximately keeping track of the frequencies of elements in a data stream. It is described by Cormode et al. in the following paper: http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf Note that this algorithm can be conveniently implemented in a distributed way, as described in section 3.2 of the paper. The paper http://www.vldb.org/conf/2002/S10P03.pdf also describes algorithms for approximately keeping track of frequencies, but here the user can specify a threshold below which she is not interested in the frequency of an element. The error-bounds are also different than the Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2148) Approximately calculate the number of distinct elements of a stream
Gabor Gevay created FLINK-2148: -- Summary: Approximately calculate the number of distinct elements of a stream Key: FLINK-2148 URL: https://issues.apache.org/jira/browse/FLINK-2148 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Priority: Minor In the paper http://people.seas.harvard.edu/~minilek/papers/f0.pdf Kane et al. describes an optimal algorithm for estimating the number of distinct elements in a data stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2145) Median calculation for windows
[ https://issues.apache.org/jira/browse/FLINK-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2145: --- Labels: statistics (was: ) Median calculation for windows -- Key: FLINK-2145 URL: https://issues.apache.org/jira/browse/FLINK-2145 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Labels: statistics The PreReducer for this has the following algorithm: We maintain two multisets (as, for example, balanced binary search trees), that always partition the elements of the current window to smaller-than-median and larger-than-median elements. At each store and evict, we can maintain this invariant with only O(1) multiset operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2145) Median calculation for windows
Gabor Gevay created FLINK-2145: -- Summary: Median calculation for windows Key: FLINK-2145 URL: https://issues.apache.org/jira/browse/FLINK-2145 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The PreReducer for this has the following algorithm: We maintain two multisets (as, for example, balanced binary search trees), that always partition the elements of the current window to smaller-than-median and larger-than-median elements. At each store and evict, we can maintain this invariant with only O(1) multiset operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2144) Implement count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2144: --- Labels: statistics (was: ) Implement count, average, and variance for windows -- Key: FLINK-2144 URL: https://issues.apache.org/jira/browse/FLINK-2144 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Labels: statistics By count I mean the number of elements in the window. These can be implemented very efficiently building on FLINK-2143. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2144) Implement count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2144: --- Description: By count I mean the number of elements in the window. These can be implemented very efficiently building on FLINK-2143: Store: O(1) Evict: O(1) emitWindow: O(1) memory: O(1) was: By count I mean the number of elements in the window. These can be implemented very efficiently building on FLINK-2143. Implement count, average, and variance for windows -- Key: FLINK-2144 URL: https://issues.apache.org/jira/browse/FLINK-2144 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Labels: statistics By count I mean the number of elements in the window. These can be implemented very efficiently building on FLINK-2143: Store: O(1) Evict: O(1) emitWindow: O(1) memory: O(1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2146) Fast calculation of min/max with arbitrary eviction and triggers
Gabor Gevay created FLINK-2146: -- Summary: Fast calculation of min/max with arbitrary eviction and triggers Key: FLINK-2146 URL: https://issues.apache.org/jira/browse/FLINK-2146 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Priority: Minor The last algorithm described here could be used: http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html It is based on a double-ended queue which maintains a sorted list of elements of the current window that have the possibility of being the maximal element in the future. Store: O(1) amortized Evict: O(1) emitWindow: O(1) memory: O(N) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter
Gabor Gevay created FLINK-2143: -- Summary: Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter Key: FLINK-2143 URL: https://issues.apache.org/jira/browse/FLINK-2143 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Assignee: Gabor Gevay If the inverse of the reduceFunction is also available (for example subtraction when summing numbers), then a PreReducer can maintain the aggregate in O(1) memory and O(1) time for evict, store, and emitWindow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2142) GSoC project: Exact and Approximate Statistics for Data Streams and Windows
Gabor Gevay created FLINK-2142: -- Summary: GSoC project: Exact and Approximate Statistics for Data Streams and Windows Key: FLINK-2142 URL: https://issues.apache.org/jira/browse/FLINK-2142 Project: Flink Issue Type: New Feature Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The goal of this project is to implement basic statistics of data streams and windows (like average, median, variance, correlation, etc.) in a computationally efficient manner. This involves designing custom preaggregators. The exact calculation of some statistics (eg. frequencies, or the number of distinct elements) would require memory proportional to the number of elements in the input (the window or the entire stream). However, there are efficient algorithms and data structures using less memory for calculating the same statistics only approximately, with user-specified error bounds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)