[jira] [Commented] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant

2016-02-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15168808#comment-15168808
 ] 

Gabor Gevay commented on FLINK-3519:


So you mean that you are in favor of 2?

> Subclasses of Tuples don't work if the declared type of a DataSet is not the 
> descendant
> ---
>
> Key: FLINK-3519
> URL: https://issues.apache.org/jira/browse/FLINK-3519
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> If I have a subclass of TupleN, then objects of this type will turn into 
> TupleNs when I try to use them in a DataSet.
> For example, if I have a class like this:
> {code}
> public static class Foo extends Tuple1 {
>   public short a;
>   public Foo() {}
>   public Foo(int f0, int a) {
>   this.f0 = f0;
>   this.a = (short)a;
>   }
>   @Override
>   public String toString() {
>   return "(" + f0 + ", " + a + ")";
>   }
> }
> {code}
> And then I do this:
> {code}
> env.fromElements(0,0,0).map(new MapFunction() {
>   @Override
>   public Tuple1 map(Integer value) throws Exception {
>   return new Foo(5, 6);
>   }
> }).print();
> {code}
> Then I don't have Foos in the output, but only Tuples:
> {code}
> (5)
> (5)
> (5)
> {code}
> The problem is caused by the TupleSerializer not caring about subclasses at 
> all. I guess the reason for this is performance: we don't want to deal with 
> writing and reading subclass tags when we have Tuples.
> I see three options for solving this:
> 1. Add subclass tags to the TupleSerializer: This is not really an option, 
> because we don't want to loose performance.
> 2. Document this behavior in the javadoc of the Tuple classes.
> 3. Make the Tuple types final: this would be the clean solution, but it is 
> API breaking, and the first victim would be Gelly: the Vertex and Edge types 
> extend from tuples. (Note that the issue doesn't appear there, because the 
> DataSets there always have the type of the descendant class.)
> When deciding between 2. and 3., an important point to note is that if you 
> have your class extend from a Tuple type instead of just adding the f0, f1, 
> ... fields manually in the hopes of getting the performance boost associated 
> with Tuples, then you are out of luck: the PojoSerializer will kick in anyway 
> when the declared types of your DataSets are the descendant type.
> If someone knows about a good reason to extend from a Tuple class, then 
> please comment.
> For 2., this is a suggested wording for the javadoc of the Tuple classes:
> Warning: Please don't subclass Tuple classes, but if you do, then be sure to 
> always declare the element type of your DataSets to your descendant type. 
> (That is, if you have a "class A extends Tuple2", then don't use instances of 
> A in a DataSet, but use DataSet.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant

2016-02-26 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3519:
---
Affects Version/s: 1.0.0

> Subclasses of Tuples don't work if the declared type of a DataSet is not the 
> descendant
> ---
>
> Key: FLINK-3519
> URL: https://issues.apache.org/jira/browse/FLINK-3519
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> If I have a subclass of TupleN, then objects of this type will turn into 
> TupleNs when I try to use them in a DataSet.
> For example, if I have a class like this:
> {code}
> public static class Foo extends Tuple1 {
>   public short a;
>   public Foo() {}
>   public Foo(int f0, int a) {
>   this.f0 = f0;
>   this.a = (short)a;
>   }
>   @Override
>   public String toString() {
>   return "(" + f0 + ", " + a + ")";
>   }
> }
> {code}
> And then I do this:
> {code}
> env.fromElements(0,0,0).map(new MapFunction() {
>   @Override
>   public Tuple1 map(Integer value) throws Exception {
>   return new Foo(5, 6);
>   }
> }).print();
> {code}
> Then I don't have Foos in the output, but only Tuples:
> {code}
> (5)
> (5)
> (5)
> {code}
> The problem is caused by the TupleSerializer not caring about subclasses at 
> all. I guess the reason for this is performance: we don't want to deal with 
> writing and reading subclass tags when we have Tuples.
> I see three options for solving this:
> 1. Add subclass tags to the TupleSerializer: This is not really an option, 
> because we don't want to loose performance.
> 2. Document this behavior in the javadoc of the Tuple classes.
> 3. Make the Tuple types final: this would be the clean solution, but it is 
> API breaking, and the first victim would be Gelly: the Vertex and Edge types 
> extend from tuples. (Note that the issue doesn't appear there, because the 
> DataSets there always have the type of the descendant class.)
> When deciding between 2. and 3., an important point to note is that if you 
> have your class extend from a Tuple type instead of just adding the f0, f1, 
> ... fields manually in the hopes of getting the performance boost associated 
> with Tuples, then you are out of luck: the PojoSerializer will kick in anyway 
> when the declared types of your DataSets are the descendant type.
> If someone knows about a good reason to extend from a Tuple class, then 
> please comment.
> For 2., this is a suggested wording for the javadoc of the Tuple classes:
> Warning: Please don't subclass Tuple classes, but if you do, then be sure to 
> always declare the element type of your DataSets to your descendant type. 
> (That is, if you have a "class A extends Tuple2", then don't use instances of 
> A in a DataSet, but use DataSet.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant

2016-02-26 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3519:
--

 Summary: Subclasses of Tuples don't work if the declared type of a 
DataSet is not the descendant
 Key: FLINK-3519
 URL: https://issues.apache.org/jira/browse/FLINK-3519
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Gabor Gevay
Priority: Minor


If I have a subclass of TupleN, then objects of this type will turn into 
TupleNs when I try to use them in a DataSet.

For example, if I have a class like this:
{code}
public static class Foo extends Tuple1 {
public short a;

public Foo() {}

public Foo(int f0, int a) {
this.f0 = f0;
this.a = (short)a;
}

@Override
public String toString() {
return "(" + f0 + ", " + a + ")";
}
}
{code}
And then I do this:
{code}
env.fromElements(0,0,0).map(new MapFunction() {
@Override
public Tuple1 map(Integer value) throws Exception {
return new Foo(5, 6);
}
}).print();
{code}
Then I don't have Foos in the output, but only Tuples:
{code}
(5)
(5)
(5)
{code}

The problem is caused by the TupleSerializer not caring about subclasses at 
all. I guess the reason for this is performance: we don't want to deal with 
writing and reading subclass tags when we have Tuples.

I see three options for solving this:
1. Add subclass tags to the TupleSerializer: This is not really an option, 
because we don't want to loose performance.
2. Document this behavior in the javadoc of the Tuple classes.
3. Make the Tuple types final: this would be the clean solution, but it is API 
breaking, and the first victim would be Gelly: the Vertex and Edge types extend 
from tuples. (Note that the issue doesn't appear there, because the DataSets 
there always have the type of the descendant class.)

When deciding between 2. and 3., an important point to note is that if you have 
your class extend from a Tuple type instead of just adding the f0, f1, ... 
fields manually in the hopes of getting the performance boost associated with 
Tuples, then you are out of luck: the PojoSerializer will kick in anyway when 
the declared types of your DataSets are the descendant type.

If someone knows about a good reason to extend from a Tuple class, then please 
comment.

For 2., this is a suggested wording for the javadoc of the Tuple classes:
Warning: Please don't subclass Tuple classes, but if you do, then be sure to 
always declare the element type of your DataSets to your descendant type. (That 
is, if you have a "class A extends Tuple2", then don't use instances of A in a 
DataSet, but use DataSet.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167415#comment-15167415
 ] 

Gabor Gevay commented on FLINK-3322:


It's making it even worse:
false, 500m: 30s
false, 5000m: 115s
true, 500m: 8s
true, 5000m: 10s


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167377#comment-15167377
 ] 

Gabor Gevay commented on FLINK-3322:


I have constructed a much simpler example to demonstrate the problem: 
ConnectedComponents on a graph that is an 1000 length path: 1->2, 2->3, 3->4, 
4->5, ... 999->1000:
https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc-2
The class to run is org.apache.flink.graph.example.ConnectedComponents. Try 
increasing the memory from eg. 500m to 5000m, and look at the difference when  
taskmanager.memory.preallocate is true and false (TaskManager.scala:1713).
I measured the following times on my laptop:
false, 500m:  14s
false, 5000m: 115s
true, 500m:   8s
true, 5000m:  13s

(I guess that the difference between the two runs where preallocate is true is 
due to the time it takes for the JVM to allocate the memory once, but this 
should also be checked that it isn't for some other unexpected reason.)

So the bottom line is that the problem gets worse when there are more 
iterations. (We have 1001 iterations in the linked example.)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3277) Use Value types in Gelly API

2016-02-18 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152504#comment-15152504
 ] 

Gabor Gevay commented on FLINK-3277:


There is one extra indirection (pointer) when something is a CopyableValue, 
compared to when something is a primitive type like long. But this probably 
only has a very little performance effect.

> Use Value types in Gelly API
> 
>
> Key: FLINK-3277
> URL: https://issues.apache.org/jira/browse/FLINK-3277
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> This would be a breaking change so the discussion needs to happen before the 
> 1.0.0 release.
> I think it would benefit Flink to use {{Value}} types wherever possible. The 
> {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each 
> return {{DataSet>}}. Using {{Long}} creates a new heap object 
> for every serialization and deserialization. The mutable {{Value}} types do 
> not suffer from this issue when object reuse is enabled.
> I lean towards a preference for conciseness in documentation and performance 
> in examples and APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3277) Use Value types in Gelly API

2016-02-18 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152448#comment-15152448
 ] 

Gabor Gevay commented on FLINK-3277:


Ah, yes, you are right, this wouldn't work with Tuples. It would only work with 
POJOs having `long` fields.
So, it seems that the current performance difference between Tuples and POJOs 
would likely get reversed after code generation in the serializers are 
introduced.

> Use Value types in Gelly API
> 
>
> Key: FLINK-3277
> URL: https://issues.apache.org/jira/browse/FLINK-3277
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> This would be a breaking change so the discussion needs to happen before the 
> 1.0.0 release.
> I think it would benefit Flink to use {{Value}} types wherever possible. The 
> {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each 
> return {{DataSet>}}. Using {{Long}} creates a new heap object 
> for every serialization and deserialization. The mutable {{Value}} types do 
> not suffer from this issue when object reuse is enabled.
> I lean towards a preference for conciseness in documentation and performance 
> in examples and APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3277) Use Value types in Gelly API

2016-02-18 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152407#comment-15152407
 ] 

Gabor Gevay commented on FLINK-3277:


I think that this issue that new heap objects are created when deserializing a 
Tuple/POJO with a long field, will have a clean solution when we will have 
code-generation for the serializers. Then fields of primitive types (like long) 
can be handled specially: the generated code won't call the fieldSerializer, 
but directly call readLong, and write the result into the field.

> Use Value types in Gelly API
> 
>
> Key: FLINK-3277
> URL: https://issues.apache.org/jira/browse/FLINK-3277
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> This would be a breaking change so the discussion needs to happen before the 
> 1.0.0 release.
> I think it would benefit Flink to use {{Value}} types wherever possible. The 
> {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each 
> return {{DataSet>}}. Using {{Long}} creates a new heap object 
> for every serialization and deserialization. The mutable {{Value}} types do 
> not suffer from this issue when object reuse is enabled.
> I lean towards a preference for conciseness in documentation and performance 
> in examples and APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3322:
---
Priority: Critical  (was: Major)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)

2016-02-12 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3394:
--

 Summary: Clear up the contract of MutableObjectIterator.next(reuse)
 Key: FLINK-3394
 URL: https://issues.apache.org/jira/browse/FLINK-3394
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Priority: Critical


{{MutableObjectIterator.next(reuse)}} has the following contract (according to 
[~StephanEwen]'s comment \[1\]):

1. The caller may not hold onto {{reuse}} any more
2. The iterator implementor may not hold onto the returned object any more.

This should be documented in its javadoc (with "WARNING" so that people don't 
overlook it).

Additionally, since this was a "secret contract" up to now, all the 270 usages 
of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A 
few that are suspicious at first glance, are in {{CrossDriver}}, 
{{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, 
{{ReusingBuildFirstHashJoinIterator.callWithNextKey}}.

\[1\] 
https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)

2016-02-12 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3394:
---
Fix Version/s: 1.0.0

> Clear up the contract of MutableObjectIterator.next(reuse)
> --
>
> Key: FLINK-3394
> URL: https://issues.apache.org/jira/browse/FLINK-3394
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> {{MutableObjectIterator.next(reuse)}} has the following contract (according 
> to [~StephanEwen]'s comment \[1\]):
> 1. The caller may not hold onto {{reuse}} any more
> 2. The iterator implementor may not hold onto the returned object any more.
> This should be documented in its javadoc (with "WARNING" so that people don't 
> overlook it).
> Additionally, since this was a "secret contract" up to now, all the 270 
> usages of {{MutableObjectIterator.next(reuse)}} should be checked for 
> violations. A few that are suspicious at first glance, are in 
> {{CrossDriver}}, {{UnionWithTempOperator}}, 
> {{MutableHashTable.ProbeIterator.next}}, 
> {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}.
> \[1\] 
> https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-02-12 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144755#comment-15144755
 ] 

Gabor Gevay commented on FLINK-3291:


{quote}
The initial idea of a contract for MutableObjectiterator.next(reuse) was the 
following:
1. The caller may not hold onto reuse any more
2. The iterator implementor may not hold onto the returned object any more.
Given that this was long ago (5 years probably, since I created that 
interface), I am pretty sure that contract is not obeyed everywhere.
{quote}

OK, this clears things up; thanks for chiming in, [~StephanEwen]!

Then the things we should do are
1. Add this contract to the javadoc of `MutableObjectiterator.next(reuse)`.
2. Go with [~greghogan]'s solution \[1\] to fix the problem brought up by this 
Jira.
3. Check all calls to `MutableObjectiterator.next(reuse)`. (A few suspicious 
ones are in `CrossDriver`, `UnionWithTempOperator`, 
`MutableHashTable.ProbeIterator.next`, and 
`ReusingBuildFirstHashJoinIterator.callWithNextKey`.)

{quote}
Gabor Gevay, at this point are the changes to MergeIterator fixing a bug? Do 
you want to fix up and clarify the documentation for MutableObjectIterator and 
verify the implementing classes?
{quote}
My changes to `MergeIterator` become unnecessary if the contracts of 
`MutableObjectIterator.next(reuse)` are what Stephan said.
I'll open a Jira for fixing the javadoc of `MutableObjectIterator.next(reuse)` 
to include the contract (and possibly also add this info to a new wiki page 
that will be linked from the new version of the object reuse documentation 
brewing at \[2\]), and check all call sites whether they obey the contract.

\[1\] https://github.com/apache/flink/pull/1626
\[2\] 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)

2016-02-12 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3394:
---
Description: 
{{MutableObjectIterator.next(reuse)}} has the following contract (according to 
[~StephanEwen]'s comment \[1\]):

1. The caller may not hold onto {{reuse}} any more
2. The iterator implementor may not hold onto the returned object any more.

This should be documented in its javadoc (with "WARNING" so that people don't 
overlook it).

Additionally, since this was a "secret contract" up to now, all the 270 usages 
of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A 
few that are suspicious at first glance, are in {{CrossDriver}}, 
{{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, 
{{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls in 
the reduce drivers are being fixed by https://github.com/apache/flink/pull/1626 
)

\[1\] 
https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654

  was:
{{MutableObjectIterator.next(reuse)}} has the following contract (according to 
[~StephanEwen]'s comment \[1\]):

1. The caller may not hold onto {{reuse}} any more
2. The iterator implementor may not hold onto the returned object any more.

This should be documented in its javadoc (with "WARNING" so that people don't 
overlook it).

Additionally, since this was a "secret contract" up to now, all the 270 usages 
of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A 
few that are suspicious at first glance, are in {{CrossDriver}}, 
{{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, 
{{ReusingBuildFirstHashJoinIterator.callWithNextKey}}.

\[1\] 
https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654


> Clear up the contract of MutableObjectIterator.next(reuse)
> --
>
> Key: FLINK-3394
> URL: https://issues.apache.org/jira/browse/FLINK-3394
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> {{MutableObjectIterator.next(reuse)}} has the following contract (according 
> to [~StephanEwen]'s comment \[1\]):
> 1. The caller may not hold onto {{reuse}} any more
> 2. The iterator implementor may not hold onto the returned object any more.
> This should be documented in its javadoc (with "WARNING" so that people don't 
> overlook it).
> Additionally, since this was a "secret contract" up to now, all the 270 
> usages of {{MutableObjectIterator.next(reuse)}} should be checked for 
> violations. A few that are suspicious at first glance, are in 
> {{CrossDriver}}, {{UnionWithTempOperator}}, 
> {{MutableHashTable.ProbeIterator.next}}, 
> {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls 
> in the reduce drivers are being fixed by 
> https://github.com/apache/flink/pull/1626 )
> \[1\] 
> https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-:
---
Priority: Blocker  (was: Critical)

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139018#comment-15139018
 ] 

Gabor Gevay commented on FLINK-:


I have raised the priority to Blocker, because it seems that there is no 
consensus about what should the contracts be regarding object reuse. See
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139219#comment-15139219
 ] 

Gabor Gevay commented on FLINK-:


OK, this would also be a clear improvement over the current documentation.
However, this doesn't discuss the rules about whether I can modify output 
objects after returning them. This question is far from being trivial (as 
evidenced by the comment thread in the Google Doc). Do you think that this 
"edge case" is too insignificant to mention?

> I think it is most important that the user documentation be clear and 
> concise, otherwise it won't be read or will discourage new users.
This is why I think that the separation between the non-chained/chained cases 
should be left out of this. If we are aiming for simplicity here, then I really 
can't imagine a user meticulously checking whether his operator will be 
chained, and then writing different code based on this.

Other minor issues:

> In practice that means that a user function will always receive the same 
> object instance
"it can happen that" should be inserted. See point C) in the "Concrete problems 
with the current documentation" section in the Google Doc.

> User defined functions (like map() or groupReduce()) 
Should be "like MapFunction or GroupReduceFunction", to avoid confusing new 
users about what is a user defined function.


> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139435#comment-15139435
 ] 

Gabor Gevay commented on FLINK-:


One more thing: if we decide to go with including chaining, then please also 
explain that a "chainable operator" means that it can be chained with the 
_previous_ operator.

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved

2016-02-08 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-:
---
Priority: Major  (was: Minor)

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted

2016-02-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136812#comment-15136812
 ] 

Gabor Gevay commented on FLINK-3266:


These are two thread dumps, one before running the job, and one after:
http://pastebin.com/aT9zMf2f
http://pastebin.com/nLzicKad
The diff between them are these threads:
ForkJoinPool-4-worker-7
ForkJoinPool-1-worker-3
ForkJoinPool-3-worker-5
Timer-0
Timer-1

> LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
> --
>
> Key: FLINK-3266
> URL: https://issues.apache.org/jira/browse/FLINK-3266
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> After a job submitted to a LocalEnvironment finishes, some threads are not 
> stopped, and are stuck in waiting forever.
> You can observe this, if you enclose the body of the main function of the 
> WordCount example with a loop that executes 100 times, and monitor the thread 
> count (with VisualVM for example).
> (The problem only happens if I use a mini cluster. If I use start-local.sh 
> and submit jobs to it, then there is no leak.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3363) JobManager does not shut down dedicated executor properly

2016-02-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136813#comment-15136813
 ] 

Gabor Gevay commented on FLINK-3363:


Is this a different issue from https://issues.apache.org/jira/browse/FLINK-3266 
?

> JobManager does not shut down dedicated executor properly
> -
>
> Key: FLINK-3363
> URL: https://issues.apache.org/jira/browse/FLINK-3363
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The JobManager does not shutdown its dedicated executor thread pool.
> In standalone/Yarn setups, that is no issue. In local embedded execution, 
> this may leave threads running that are not needed any more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved

2016-02-08 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-:
---
Priority: Critical  (was: Major)

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3333) Documentation about object reuse should be improved

2016-02-05 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-:
---
Component/s: Documentation

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-02-05 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134152#comment-15134152
 ] 

Gabor Gevay commented on FLINK-3291:


@Greg, my current problem is that I'm not sure what should be the exact 
contract for `MutableObjectIterator.next(reuse)`. The fix that I have so far 
\[1\] makes 3b "yes" (see \[2\]), but it seems that you would rather go for 
"no", so that it is possible to handle object reuse by "exchanging the given 
object for an existing object" as you say in your comment \[3\]. Do you think 
that the performance gain (avoiding a copy) makes the complications that arise 
at every use of `MutableObjectIterator.next(reuse)` worthwhile?

\[1\] https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1
\[2\] 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit
\[3\] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java#L70

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3333) Documentation about object reuse should be improved

2016-02-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-:
--

 Summary: Documentation about object reuse should be improved
 Key: FLINK-
 URL: https://issues.apache.org/jira/browse/FLINK-
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.0.0


The documentation about object reuse \[1\] has several problems, see \[2\].

\[1\] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior

\[2\] 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3335) DataSourceTask object reuse when disabled

2016-02-04 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132453#comment-15132453
 ] 

Gabor Gevay commented on FLINK-3335:


Are you sure that adding that copy is the best solution here? An alternative 
solution would be to specify in the javadoc of InputFormat.nextRecord that it 
should satisfy the same contract that also governs how UDFs like MapFunction 
treat objects that they return or give to Collector.collect. (See here: 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit)

> DataSourceTask object reuse when disabled
> -
>
> Key: FLINK-3335
> URL: https://issues.apache.org/jira/browse/FLINK-3335
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> From {{DataSourceTask.invoke()}}:
> {code}
> if ((returned = format.nextRecord(serializer.createInstance())) != null) {
> output.collect(returned);
> }
> {code}
> The returned value ({{returned}}) must be copied rather than creating and 
> passing in a new instance. The {{InputFormat}} interface only permits the 
> given object to be used and does not require a new object to be returned 
> otherwise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-02-03 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130452#comment-15130452
 ] 

Gabor Gevay commented on FLINK-3291:


OK, sorry, I think I have now understood what you meant by ReduceDriver.run not 
tracking the object returned from the iterator call. The problem here is that 
after 0a8df6d513fa59d650ff875bdf3a1613d0f14af5, I mustn't modify an object that 
I have given to an iterator.next call as a reuse object, because 
MergeIterator.HeadStream.nextHead saves a reference to it, and expects that 
object to not change. But this seems like a rather scary requirement, and I 
wouldn't be sure that some other code besides ReduceDriver somewhere doesn't 
also violate it.

I think that the root cause of these issues, is that the documentation about 
object reuse [1] is rather inadequate in clearly stating what are the contracts 
in this area, so I tried to put together a Google Doc about this: [2]. Could 
you please look at it, and tell me how much it aligns with your way of thinking 
about object reuse?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
[2] 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3322:
--

 Summary: MemoryManager creates too much GC pressure with iterative 
jobs
 Key: FLINK-3322
 URL: https://issues.apache.org/jira/browse/FLINK-3322
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 1.0.0
Reporter: Gabor Gevay


When taskmanager.memory.preallocate is false (the default), released memory 
segments are not added to a pool, but the GC is expected to take care of them. 
This puts too much pressure on the GC with iterative jobs, where the operators 
reallocate all memory at every superstep.

See the following discussion on the mailing list:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html

Reproducing the issue:
https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
The class to start is malom.Solver. If you increase the memory given to the JVM 
from 1 to 50 GB, performance gradually degrades by more than 10 times. (It will 
generate some lookuptables to /tmp on first run for a few minutes.) (I think 
the slowdown might also depend somewhat on taskmanager.memory.fraction, because 
more unused non-managed memory results in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3321) TupleSerializerBase.getLength should know the length when all fields know it

2016-02-02 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3321:
---
Labels: starter  (was: )

> TupleSerializerBase.getLength should know the length when all fields know it
> 
>
> Key: FLINK-3321
> URL: https://issues.apache.org/jira/browse/FLINK-3321
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> TupleSerializerBase.getLength currently always returns -1, but it could 
> actually know the length, when all the field serializers know their lengths 
> (since no null can appear anywhere in Tuples, nor can a subclass of Tuple 
> with additional fields appear).
> (The serializer knowing the exact size has various performance benefits, for 
> example see FixedLengthRecordSorter, or 
> CompactingHashTable.getInitialTableSize.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-02 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3322:
---
Fix Version/s: 1.0.0

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3321) TupleSerializerBase.getLength should know the length when all fields know it

2016-02-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3321:
--

 Summary: TupleSerializerBase.getLength should know the length when 
all fields know it
 Key: FLINK-3321
 URL: https://issues.apache.org/jira/browse/FLINK-3321
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: Gabor Gevay
Priority: Minor


TupleSerializerBase.getLength currently always returns -1, but it could 
actually know the length, when all the field serializers know their lengths 
(since no null can appear anywhere in Tuples, nor can a subclass of Tuple with 
additional fields appear).

(The serializer knowing the exact size has various performance benefits, for 
example see FixedLengthRecordSorter, or 
CompactingHashTable.getInitialTableSize.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-02-02 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128243#comment-15128243
 ] 

Gabor Gevay commented on FLINK-3291:


> `ReduceDriver.run` owns two objects but is not tracking the object returned 
> in `while ((value = input.next(reuse2)) != null) {`

The problem is more complicated than that: This can't be done with just 1 or 2 
reuse objects, each of the head streams need to own objects themselves.

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-02-02 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128225#comment-15128225
 ] 

Gabor Gevay commented on FLINK-3291:


I'm almost done with the fix, see here:
https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1

The only thing that is missing is adding a small test for specifically this 
problem. And I would like to additionally figure out why the test that are 
already in place didn't catch this problem.

Btw. I couldn't avoid introducing an extra copy. (Th solution that I wrote in 
the Jira description, doesn't really work in itself.)

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-02-02 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128225#comment-15128225
 ] 

Gabor Gevay edited comment on FLINK-3291 at 2/2/16 1:14 PM:


I'm almost done with the fix, see here:
https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1

The only thing that is missing is adding a small test for specifically this 
problem. And I would like to additionally figure out why the tests that are 
already in place didn't catch this problem.

Btw. I couldn't avoid introducing an extra copy. (The solution that I wrote in 
the Jira description, doesn't really work in itself.)


was (Author: ggevay):
I'm almost done with the fix, see here:
https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug-fix1

The only thing that is missing is adding a small test for specifically this 
problem. And I would like to additionally figure out why the test that are 
already in place didn't catch this problem.

Btw. I couldn't avoid introducing an extra copy. (Th solution that I wrote in 
the Jira description, doesn't really work in itself.)

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-01-26 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3291:
--

 Summary: Object reuse bug in MergeIterator.HeadStream.nextHead
 Key: FLINK-3291
 URL: https://issues.apache.org/jira/browse/FLINK-3291
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Critical


MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
`reuse` object that it got as an argument. This object might be modified later 
by the caller.

This actually happens when ReduceDriver.run calls input.next (which will 
actually be MergeIterator.next(E reuse)) in the inner while loop of the 
objectReuseEnabled branch, and that calls top.nextHead with the reference that 
it got from ReduceDriver, which erroneously saves the reference, and then 
ReduceDriver later uses that same object for doing the reduce.

Another way in which this fails is when MergeIterator.next(E reuse) gives 
`reuse` to different `top`s in different calls, and then the heads end up being 
the same object.

You can observe the latter situation in action by running ReducePerformance 
here:
https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
Set memory to -Xmx200m (so that the MergeIterator actually has merging to do), 
put a breakpoint at the beginning of MergeIterator.next(reuse), and then watch 
`reuse`, and the heads of the first two elements of `this.heap` in the 
debugger. They will get to be the same object after hitting continue about 6 
times.

You can also look at the count that is printed at the end, which shouldn't be 
larger than the key range. Also, if you look into the output file 
/tmp/xxxobjectreusebug, for example the key 77 appears twice.

The good news is that I think I can see an easy fix that doesn't affect 
performance: MergeIterator.HeadStream could have a reuse object of its own as a 
member, and give that to iterator.next in nextHead(E reuse). And then we 
wouldn't need the overload of nextHead that has the reuse parameter, and 
MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-01-26 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3291:
---
Component/s: Distributed Runtime

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2246) Add chained combine driver strategy for ReduceFunction

2016-01-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115174#comment-15115174
 ] 

Gabor Gevay commented on FLINK-2246:


I have added a commit to https://github.com/apache/flink/pull/1517 , which adds 
the chained version of ReduceCombineDriver.

> Add chained combine driver strategy for ReduceFunction
> --
>
> Key: FLINK-2246
> URL: https://issues.apache.org/jira/browse/FLINK-2246
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Gabor Gevay
>Priority: Minor
>
> Running the WordCount example with a text file input/output results and a 
> manual reduce function (instead of the sum(1)) results in a combiner, which 
> is not chained.
> Replace sum(1) with the following to reproduce and use a text file as input:
> {code}
> fileOutput = true;
> textPath = "...";
> outputPath = "...";
> {code}
> {code}
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 value1, 
> Tuple2 value2) throws Exception {
> return new Tuple2(value1.f0, value1.f1 + value2.f1);
> }
> });
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2246) Add chained combine driver strategy for ReduceFunction

2016-01-22 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-2246:
--

Assignee: Gabor Gevay

> Add chained combine driver strategy for ReduceFunction
> --
>
> Key: FLINK-2246
> URL: https://issues.apache.org/jira/browse/FLINK-2246
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Gabor Gevay
>Priority: Minor
>
> Running the WordCount example with a text file input/output results and a 
> manual reduce function (instead of the sum(1)) results in a combiner, which 
> is not chained.
> Replace sum(1) with the following to reproduce and use a text file as input:
> {code}
> fileOutput = true;
> textPath = "...";
> outputPath = "...";
> {code}
> {code}
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 value1, 
> Tuple2 value2) throws Exception {
> return new Tuple2(value1.f0, value1.f1 + value2.f1);
> }
> });
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted

2016-01-21 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110349#comment-15110349
 ] 

Gabor Gevay commented on FLINK-3266:


They have names like `ForkJoinPool-168-worker-3` and `Timer-13`. Here is a 
thread dump:
http://pastebin.com/h807AcHH

> LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
> --
>
> Key: FLINK-3266
> URL: https://issues.apache.org/jira/browse/FLINK-3266
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> After a job submitted to a LocalEnvironment finishes, some threads are not 
> stopped, and are stuck in waiting forever.
> You can observe this, if you enclose the body of the main function of the 
> WordCount example with a loop that executes 100 times, and monitor the thread 
> count (with VisualVM for example).
> (The problem only happens if I use a mini cluster. If I use start-local.sh 
> and submit jobs to it, then there is no leak.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted

2016-01-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15108861#comment-15108861
 ] 

Gabor Gevay commented on FLINK-3266:


Threads staying alive are the most apparent, but there is some memory as well 
(maybe about 200 KB per job).

> LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
> --
>
> Key: FLINK-3266
> URL: https://issues.apache.org/jira/browse/FLINK-3266
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> After a job submitted to a LocalEnvironment finishes, some threads are not 
> stopped, and are stuck in waiting forever.
> You can observe this, if you enclose the body of the main function of the 
> WordCount example with a loop that executes 100 times, and monitor the thread 
> count (with VisualVM for example).
> (The problem only happens if I use a mini cluster. If I use start-local.sh 
> and submit jobs to it, then there is no leak.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted

2016-01-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3266:
--

 Summary: LocalFlinkMiniCluster leaks resources when multiple jobs 
are submitted
 Key: FLINK-3266
 URL: https://issues.apache.org/jira/browse/FLINK-3266
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Priority: Minor


After a job submitted to a LocalEnvironment finishes, some threads are not 
stopped, and are stuck in waiting forever.

You can observe this, if you enclose the body of the main function of the 
WordCount example with a loop that executes 100 times, and monitor the thread 
count (with VisualVM for example).

(The problem only happens if I use a mini cluster. If I use start-local.sh and 
submit jobs to it, then there is no leak.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds

2015-12-13 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3166:
--

 Summary: The first program in ObjectReuseITCase has the wrong 
expected result, and it succeeds
 Key: FLINK-3166
 URL: https://issues.apache.org/jira/browse/FLINK-3166
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay
Priority: Critical


The first program in ObjectReuseITCase has the following input:
a,1
a,2
a,3
a,4
a,50
There is a groupBy on field 0, and then a reduce, so the result should be 
1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the 
Flink program also produces this.

The problem is caused my mismatched assumptions between 
ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about 
object reuse rules of ReduceFunctions:

ReduceCombineDriver.sortAndCombine has the following comment:
"The user function is expected to return the first input as the result."
While the ReduceFunction in the test is modifying and returning the second 
input. (And the second program in the test also has the same problem.)

I can't find the assumption that is stated in the comment in any documentation. 
For example, the javadoc of ReduceFunction should make the user aware of this. 
Or, alternatively, the code of the driver should be modified to not make this 
assumption. I'm not sure which solution is better.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds

2015-12-13 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3166:
---
Component/s: Tests

> The first program in ObjectReuseITCase has the wrong expected result, and it 
> succeeds
> -
>
> Key: FLINK-3166
> URL: https://issues.apache.org/jira/browse/FLINK-3166
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Gabor Gevay
>Priority: Critical
>
> The first program in ObjectReuseITCase has the following input:
> a,1
> a,2
> a,3
> a,4
> a,50
> There is a groupBy on field 0, and then a reduce, so the result should be 
> 1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the 
> Flink program also produces this.
> The problem is caused my mismatched assumptions between 
> ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about 
> object reuse rules of ReduceFunctions:
> ReduceCombineDriver.sortAndCombine has the following comment:
> "The user function is expected to return the first input as the result."
> While the ReduceFunction in the test is modifying and returning the second 
> input. (And the second program in the test also has the same problem.)
> I can't find the assumption that is stated in the comment in any 
> documentation. For example, the javadoc of ReduceFunction should make the 
> user aware of this. Or, alternatively, the code of the driver should be 
> modified to not make this assumption. I'm not sure which solution is better.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds

2015-12-13 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3166:
---
Component/s: Documentation
 Distributed Runtime

> The first program in ObjectReuseITCase has the wrong expected result, and it 
> succeeds
> -
>
> Key: FLINK-3166
> URL: https://issues.apache.org/jira/browse/FLINK-3166
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime, Documentation, Tests
>Reporter: Gabor Gevay
>Priority: Critical
>
> The first program in ObjectReuseITCase has the following input:
> a,1
> a,2
> a,3
> a,4
> a,50
> There is a groupBy on field 0, and then a reduce, so the result should be 
> 1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the 
> Flink program also produces this.
> The problem is caused my mismatched assumptions between 
> ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about 
> object reuse rules of ReduceFunctions:
> ReduceCombineDriver.sortAndCombine has the following comment:
> "The user function is expected to return the first input as the result."
> While the ReduceFunction in the test is modifying and returning the second 
> input. (And the second program in the test also has the same problem.)
> I can't find the assumption that is stated in the comment in any 
> documentation. For example, the javadoc of ReduceFunction should make the 
> user aware of this. Or, alternatively, the code of the driver should be 
> modified to not make this assumption. I'm not sure which solution is better.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-11-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15006330#comment-15006330
 ] 

Gabor Gevay commented on FLINK-2662:


Hi,
Unfortunately, I haven't made any progress. It is still reproducible, I have 
now rebased to the current master, and the same thing happens when I try it on 
my home laptop. This is the rebased branch:
https://github.com/ggevay/flink/tree/plan-generation-bug-rebased

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10
>Reporter: Gabor Gevay
> Fix For: 0.10
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2757) DataSinkTaskTest fails on Windows

2015-11-12 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15002124#comment-15002124
 ] 

Gabor Gevay commented on FLINK-2757:


This might be related to https://issues.apache.org/jira/browse/FLINK-2369.

> DataSinkTaskTest fails on Windows
> -
>
> Key: FLINK-2757
> URL: https://issues.apache.org/jira/browse/FLINK-2757
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
> Environment: Windows 10, Cygwin, "mvn clean install"
>Reporter: Fabian Hueske
> Fix For: 0.10
>
>
> The following tests of {{DataSinkTaskTest}} fail when calling {{mvn clean 
> install}} in Cygwin on Windows 10:
> {code}
> DataSinkTaskTest.testFailingDataSinkTask: Temp output file has not been 
> removed
> DataSinkTaskTest.testFailingSortingDataSinkTask: Temp output file has not 
> been removed
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-10-28 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14978124#comment-14978124
 ] 

Gabor Gevay commented on FLINK-2662:


It is working on my work laptop...

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10
>Reporter: Gabor Gevay
> Fix For: 0.10
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-10-28 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14978675#comment-14978675
 ] 

Gabor Gevay commented on FLINK-2662:


Yes, because I checked out the branch that is referred here in both cases.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10
>Reporter: Gabor Gevay
> Fix For: 0.10
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-10-27 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14976982#comment-14976982
 ] 

Gabor Gevay commented on FLINK-2662:


That's strange. I have also tried to reproduce this now, and it is still giving 
the same error for me. Do you have any idea as to what characteristic of my 
machine might influence the plan generation in a way that this error happens 
only for me?

Tomorrow I will try this on my work laptop as well, and report back.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10
>Reporter: Gabor Gevay
> Fix For: 0.10
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2847) Fix flaky test in StormTestBase.testJob

2015-10-10 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2847:
---
Labels: test-stability  (was: )

> Fix flaky test in StormTestBase.testJob
> ---
>
> Key: FLINK-2847
> URL: https://issues.apache.org/jira/browse/FLINK-2847
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Henry Saputra
>Priority: Minor
>  Labels: test-stability
>
> {code}
> testJob(org.apache.flink.storm.wordcount.WordCountLocalITCase)  Time elapsed: 
> 12.845 sec  <<< FAILURE!
> java.lang.AssertionError: Different number of lines in expected and obtained 
> result. expected:<801> but was:<0>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:555)
>   at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:305)
>   at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:291)
>   at 
> org.apache.flink.storm.wordcount.WordCountLocalITCase.postSubmit(WordCountLocalITCase.java:38)
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.815 sec - 
> in org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNamesITCase
> Running org.apache.flink.storm.wordcount.BoltTokenizerWordCountITCase
> Running org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojoITCase
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.55 sec - in 
> org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojoITCase
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.801 sec - 
> in org.apache.flink.storm.wordcount.BoltTokenizerWordCountITCase
> Results :
> Failed tests: 
>   
> WordCountLocalITCase>StormTestBase.testJob:98->postSubmit:38->TestBaseUtils.compareResultsByLinesInMemory:291->TestBaseUtils.compareResultsByLinesInMemory:305
>  Different number of lines in expected and obtained result. expected:<801> 
> but was:<0>
> Tests run: 11, Failures: 1, Errors: 0, Skipped: 0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2839) Failing test: OperatorStatsAccumulatorTest.testAccumulatorAllStatistics

2015-10-09 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2839:
--

 Summary: Failing test: 
OperatorStatsAccumulatorTest.testAccumulatorAllStatistics
 Key: FLINK-2839
 URL: https://issues.apache.org/jira/browse/FLINK-2839
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Reporter: Gabor Gevay
Priority: Minor


I saw this test failure:

{code}
Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 7.633 sec <<< 
FAILURE! - in 
org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest
testAccumulatorAllStatistics(org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest)
  Time elapsed: 1.5 sec  <<< FAILURE!
java.lang.AssertionError: The total number of heavy hitters should be between 0 
and 5.
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest.testAccumulatorAllStatistics(OperatorStatsAccumulatorTest.java:151)
{code}

Full log 
[here|https://s3.amazonaws.com/archive.travis-ci.org/jobs/84469788/log.txt].

Maybe the test should set a constant seed to the {{Random}} object.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2818) The *ReduceDriver classes have incorrect javadocs

2015-10-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2818:
--

 Summary: The *ReduceDriver classes have incorrect javadocs
 Key: FLINK-2818
 URL: https://issues.apache.org/jira/browse/FLINK-2818
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Trivial


The javadocs of ReduceDriver, AllReduceDriver, AllGroupReduceDriver, 
GroupReduceDriver, GroupReduceCombineDriver, and GroupCombineChainedDriver are 
outdated and/or copy-pasted-then-not-correctly-modified, which makes 
deciphering what all these classes are more difficult.

PR coming shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2237) Add hash-based Aggregation

2015-10-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-2237:
--

Assignee: Gabor Gevay

> Add hash-based Aggregation
> --
>
> Key: FLINK-2237
> URL: https://issues.apache.org/jira/browse/FLINK-2237
> Project: Flink
>  Issue Type: New Feature
>Reporter: Rafiullah Momand
>Assignee: Gabor Gevay
>Priority: Minor
>
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2809) DataSet[Unit] doesn't work

2015-10-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2809:
--

 Summary: DataSet[Unit] doesn't work
 Key: FLINK-2809
 URL: https://issues.apache.org/jira/browse/FLINK-2809
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The following code creates a DataSet\[Unit\]:

val env = ExecutionEnvironment.createLocalEnvironment()
val a = env.fromElements(1,2,3)
val b = a.map (_ => ())
b.writeAsText("/tmp/xxx")
env.execute()

This doesn't work, because a VoidSerializer is created, which can't cope with a 
BoxedUnit. See exception below.

I'm now thinking about creating a UnitSerializer class.



org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast 
to java.lang.Void
at 
org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2806) No TypeInfo for Scala's Nothing type

2015-10-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2806:
--

 Summary: No TypeInfo for Scala's Nothing type
 Key: FLINK-2806
 URL: https://issues.apache.org/jira/browse/FLINK-2806
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


When writing some generic code, I encountered a situation where I needed a 
TypeInformation[Nothing]. Two problems prevent me from getting it:
1. TypeInformationGen.mkTypeInfo doesn't return a real 
TypeInformation[Nothing]. (It actually returns a casted null in that case.)
2. The line
implicit def createTypeInformation[T]: TypeInformation[T] = macro 
TypeUtils.createTypeInfo[T]
does not fire in some situations when it should, when T = Nothing. (I guess 
this is a compiler bug.)

I will open a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2237) Add hash-based Aggregation

2015-09-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14909174#comment-14909174
 ] 

Gabor Gevay commented on FLINK-2237:


Couldn't the CompactingHashTable be reused here? The driver would get a prober 
from it, and then for each incoming record, the driver would call getMatchFor, 
then do one step of the reduce, and then write the result back with updateMatch.

I'm interested in this feature because I have a computation where it would 
really make a huge difference: I'm calling flatMap on an already large data 
set, which blows it up to about 10 times the size, and then groupBy and reduce. 
If Flink had hash-based aggregation, then the result of the flatMap wouldn't 
need to be materialized, which would reduce the memory requirement to about 
1/10.

> Add hash-based Aggregation
> --
>
> Key: FLINK-2237
> URL: https://issues.apache.org/jira/browse/FLINK-2237
> Project: Flink
>  Issue Type: New Feature
>Reporter: Rafiullah Momand
>Priority: Minor
>
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-09-12 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2662:
--

 Summary: CompilerException: "Bug: Plan generation for Unions 
picked a ship strategy between binary plan operators."
 Key: FLINK-2662
 URL: https://issues.apache.org/jira/browse/FLINK-2662
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: master
Reporter: Gabor Gevay


I have a Flink program which throws the exception in the jira title. Full text:


Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
Plan generation for Unions picked a ship strategy between binary plan operators.
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at 
org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
at 
org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
at 
org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
at malom.Solver.main(Solver.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The execution plan:
http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
(I obtained this by commenting out the line that throws the exception)


The code is here:
https://github.com/ggevay/flink/tree/plan-generation-bug
The class to run is "Solver". It needs a command line argument, which is a 
directory where it would write output. (On first run, it generates some 
lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2181) SessionWindowing example has a memleak

2015-09-09 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2181:
---
Assignee: (was: Gabor Gevay)

> SessionWindowing example has a memleak
> --
>
> Key: FLINK-2181
> URL: https://issues.apache.org/jira/browse/FLINK-2181
> Project: Flink
>  Issue Type: Bug
>  Components: Examples, Streaming
>Reporter: Gabor Gevay
>
> The trigger policy objects belonging to already terminated sessions are kept 
> in memory, and also NotifyOnLastGlobalElement gets called on them. This 
> causes the program to eat up more and more memory, and also to get slower 
> with time.
> Mailing list discussion:
> http://mail-archives.apache.org/mod_mbox/flink-dev/201505.mbox/%3CCADXjeyBW9uaA=ayde+9r+qh85pblyavuegsr7h8pwkyntm9...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset

2015-08-23 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-2548.

Resolution: Won't Fix

 In a VertexCentricIteration, the run time of one iteration should be 
 proportional to the size of the workset
 

 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay

 Currently, the performance of vertex centric iteration is suboptimal in those 
 iterations where the workset is small, because the complexity of one 
 iteration contains the number of edges and vertices of the graph because of 
 coGroups:
 VertexCentricIteration.buildMessagingFunction does a coGroup between the 
 edges and the workset, to get the neighbors to the messaging UDF. This is 
 problematic from a performance point of view, because the coGroup UDF gets 
 called on all the edge groups, including those that are not getting any 
 messages.
 An analogous problem is present in 
 VertexCentricIteration.createResultSimpleVertex at the creation of the 
 updates: a coGroup happens between the messages and the solution set, which 
 has the number of vertices of the graph included in its complexity.
 Both of these coGroups could be avoided by doing a join instead (with the 
 same keys that the coGroup uses), and then a groupBy. The complexity of these 
 operations would be dominated by the size of the workset, as opposed to the 
 number of edges or vertices of the graph. The joins should have the edges and 
 the solution set at the build side to achieve this complexity. (They will not 
 be rebuilt at every iteration.)
 I made some experiments with this, and the initial results seem promising. On 
 some workloads, this achieves a 2 times speedup, because later iterations 
 often have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset

2015-08-23 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708364#comment-14708364
 ] 

Gabor Gevay commented on FLINK-2548:


OK, you are probably right.

I ran some more tests, and it seems that the issue in my use case is more with 
the serialization. In other cases, when the serialization of the vertex IDs is 
cheaper, then the coGroup implementation does OK with respect to the run time 
of one iteration following the workset size.

 In a VertexCentricIteration, the run time of one iteration should be 
 proportional to the size of the workset
 

 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay

 Currently, the performance of vertex centric iteration is suboptimal in those 
 iterations where the workset is small, because the complexity of one 
 iteration contains the number of edges and vertices of the graph because of 
 coGroups:
 VertexCentricIteration.buildMessagingFunction does a coGroup between the 
 edges and the workset, to get the neighbors to the messaging UDF. This is 
 problematic from a performance point of view, because the coGroup UDF gets 
 called on all the edge groups, including those that are not getting any 
 messages.
 An analogous problem is present in 
 VertexCentricIteration.createResultSimpleVertex at the creation of the 
 updates: a coGroup happens between the messages and the solution set, which 
 has the number of vertices of the graph included in its complexity.
 Both of these coGroups could be avoided by doing a join instead (with the 
 same keys that the coGroup uses), and then a groupBy. The complexity of these 
 operations would be dominated by the size of the workset, as opposed to the 
 number of edges or vertices of the graph. The joins should have the edges and 
 the solution set at the build side to achieve this complexity. (They will not 
 be rebuilt at every iteration.)
 I made some experiments with this, and the initial results seem promising. On 
 some workloads, this achieves a 2 times speedup, because later iterations 
 often have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset

2015-08-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2548:
---
Summary: In a VertexCentricIteration, the run time of one iteration should 
be proportional to the size of the workset  (was: VertexCentricIteration should 
avoid doing a coGroup with the edges and the solution set)

 In a VertexCentricIteration, the run time of one iteration should be 
 proportional to the size of the workset
 

 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay

 Currently, the performance of vertex centric iteration is suboptimal in those 
 iterations where the workset is small, because the complexity of one 
 iteration contains the number of edges and vertices of the graph because of 
 coGroups:
 VertexCentricIteration.buildMessagingFunction does a coGroup between the 
 edges and the workset, to get the neighbors to the messaging UDF. This is 
 problematic from a performance point of view, because the coGroup UDF gets 
 called on all the edge groups, including those that are not getting any 
 messages.
 An analogous problem is present in 
 VertexCentricIteration.createResultSimpleVertex at the creation of the 
 updates: a coGroup happens between the messages and the solution set, which 
 has the number of vertices of the graph included in its complexity.
 Both of these coGroups could be avoided by doing a join instead (with the 
 same keys that the coGroup uses), and then a groupBy. The complexity of these 
 operations would be dominated by the size of the workset, as opposed to the 
 number of edges or vertices of the graph. The joins should have the edges and 
 the solution set at the build side to achieve this complexity. (They will not 
 be rebuilt at every iteration.)
 I made some experiments with this, and the initial results seem promising. On 
 some workloads, this achieves a 2 times speedup, because later iterations 
 often have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset

2015-08-21 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706579#comment-14706579
 ] 

Gabor Gevay commented on FLINK-2548:


I changed the title of the issue to refer to the goal instead of the above 
proposed solution approach, because I'm not sure anymore that this is the best 
approach to achieve this.

My problem with the join + reduceGroup approach is that there are cases where 
it is a little slower (few ten percent) then the coGroup, for example when the 
edges data set doesn't fit into memory, and probably also when the workset size 
is close to all the vertices.

Another approach would be to extend the coGroup operation to have an option 
which makes it call the UDF only for the matching keys (inner join), and the 
optimizer would consider using a hashtable instead of sorting if this option is 
set. But I am not sure how big work would be to implement this.

 In a VertexCentricIteration, the run time of one iteration should be 
 proportional to the size of the workset
 

 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay

 Currently, the performance of vertex centric iteration is suboptimal in those 
 iterations where the workset is small, because the complexity of one 
 iteration contains the number of edges and vertices of the graph because of 
 coGroups:
 VertexCentricIteration.buildMessagingFunction does a coGroup between the 
 edges and the workset, to get the neighbors to the messaging UDF. This is 
 problematic from a performance point of view, because the coGroup UDF gets 
 called on all the edge groups, including those that are not getting any 
 messages.
 An analogous problem is present in 
 VertexCentricIteration.createResultSimpleVertex at the creation of the 
 updates: a coGroup happens between the messages and the solution set, which 
 has the number of vertices of the graph included in its complexity.
 Both of these coGroups could be avoided by doing a join instead (with the 
 same keys that the coGroup uses), and then a groupBy. The complexity of these 
 operations would be dominated by the size of the workset, as opposed to the 
 number of edges or vertices of the graph. The joins should have the edges and 
 the solution set at the build side to achieve this complexity. (They will not 
 be rebuilt at every iteration.)
 I made some experiments with this, and the initial results seem promising. On 
 some workloads, this achieves a 2 times speedup, because later iterations 
 often have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set

2015-08-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704730#comment-14704730
 ] 

Gabor Gevay commented on FLINK-2548:


{quote}
CoGrouping with the solution set is a different runtime operator than the 
regular CoGroup.
{quote}
I see, thanks! I think this should be documented in the iterations programming 
guide, and in the javadoc for coGroup and getSolutionSet.
{quote}
(2) To have an iterator for the message sending, you can group by source 
vertex. Then you group again by target vertex, to have an iterator for incoming 
messages (either reduce+join or CoGroup). So you have now join + reduce + 
reduce + join in teh iteration?
{quote}
For calling the MessagingFunction, I join the edges with the workset, and then 
do a reduceGroup by source vertex. This gives me an iterator over the neighbors 
inside the UDF of the reduceGroup. (Well, almost. That iterator is a little 
different from what the messaging function expects, so I wrote an iterator 
adaptor.)
See my code \[1\], and here is an execution plan which involves the new code: 
\[2\]. I think this code is memory-safe in the way you described.

\[1\] 
https://github.com/apache/flink/compare/master...ggevay:gelly-remove-coGroup-first-version
\[2\] http://compalg.inf.elte.hu/~ggevay/flink/Plan_join_groupBy.txt

 VertexCentricIteration should avoid doing a coGroup with the edges and the 
 solution set
 ---

 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay

 Currently, the performance of vertex centric iteration is suboptimal in those 
 iterations where the workset is small, because the complexity of one 
 iteration contains the number of edges and vertices of the graph because of 
 coGroups:
 VertexCentricIteration.buildMessagingFunction does a coGroup between the 
 edges and the workset, to get the neighbors to the messaging UDF. This is 
 problematic from a performance point of view, because the coGroup UDF gets 
 called on all the edge groups, including those that are not getting any 
 messages.
 An analogous problem is present in 
 VertexCentricIteration.createResultSimpleVertex at the creation of the 
 updates: a coGroup happens between the messages and the solution set, which 
 has the number of vertices of the graph included in its complexity.
 Both of these coGroups could be avoided by doing a join instead (with the 
 same keys that the coGroup uses), and then a groupBy. The complexity of these 
 operations would be dominated by the size of the workset, as opposed to the 
 number of edges or vertices of the graph. The joins should have the edges and 
 the solution set at the build side to achieve this complexity. (They will not 
 be rebuilt at every iteration.)
 I made some experiments with this, and the initial results seem promising. On 
 some workloads, this achieves a 2 times speedup, because later iterations 
 often have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set

2015-08-19 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14703423#comment-14703423
 ] 

Gabor Gevay commented on FLINK-2548:


{quote}
Actually, the second co-group is not a real co-group. It only queries the 
solution set for the vertices that have a message.
{quote}
Can you please explain how does it achieve this? I can't understand looking at 
the code, how does it not get called on every vertex.
{quote}
We used coGroup initially, because it fits the pregel model where you have an 
iterator over your neighbors.
{quote}
I do a groupBy after the join, so I have the same iterator.
{quote}
This could be realized by a join as well, although it is hard to realize that 
in a memory-safe fashion.
{quote}
What exactly do you mean here by memory-safe? I see one drawback of the 
join-then-groupBy approach memory-wise: the workset vertex-value tuples get 
replicated that many times as the vertex's out degree. Did you mean this 
problem?
{quote}
Breaking this into three UDFs (Scatter / Gather / Apply), implemented as (Join, 
Reduce, Join) would work and give the efficiency you seek. The 
ConnectedComponents example follows pretty much that pattern.
{quote}
Thanks, I will look into this.

 VertexCentricIteration should avoid doing a coGroup with the edges and the 
 solution set
 ---

 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay

 Currently, the performance of vertex centric iteration is suboptimal in those 
 iterations where the workset is small, because the complexity of one 
 iteration contains the number of edges and vertices of the graph because of 
 coGroups:
 VertexCentricIteration.buildMessagingFunction does a coGroup between the 
 edges and the workset, to get the neighbors to the messaging UDF. This is 
 problematic from a performance point of view, because the coGroup UDF gets 
 called on all the edge groups, including those that are not getting any 
 messages.
 An analogous problem is present in 
 VertexCentricIteration.createResultSimpleVertex at the creation of the 
 updates: a coGroup happens between the messages and the solution set, which 
 has the number of vertices of the graph included in its complexity.
 Both of these coGroups could be avoided by doing a join instead (with the 
 same keys that the coGroup uses), and then a groupBy. The complexity of these 
 operations would be dominated by the size of the workset, as opposed to the 
 number of edges or vertices of the graph. The joins should have the edges and 
 the solution set at the build side to achieve this complexity. (They will not 
 be rebuilt at every iteration.)
 I made some experiments with this, and the initial results seem promising. On 
 some workloads, this achieves a 2 times speedup, because later iterations 
 often have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set

2015-08-19 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2548:
--

 Summary: VertexCentricIteration should avoid doing a coGroup with 
the edges and the solution set
 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay


Currently, the performance of vertex centric iteration is suboptimal in those 
iterations where the workset is small, because the complexity of one iteration 
contains the number of edges and vertices of the graph because of coGroups:

VertexCentricIteration.buildMessagingFunction does a coGroup between the edges 
and the workset, to get the neighbors to the messaging UDF. This is problematic 
from a performance point of view, because the coGroup UDF gets called on all 
the edge groups, including those that are not getting any messages.

An analogous problem is present in 
VertexCentricIteration.createResultSimpleVertex at the creation of the updates: 
a coGroup happens between the messages and the solution set, which has the 
number of vertices of the graph included in its complexity.

Both of these coGroups could be avoided by doing a join instead (with the same 
keys that the coGroup uses), and then a groupBy. The complexity of these 
operations would be dominated by the size of the workset, as opposed to the 
number of edges or vertices of the graph. The joins should have the edges and 
the solution set at the build side to achieve this complexity. (They will not 
be rebuilt at every iteration.)

I made some experiments with this, and the initial results seem promising. On 
some workloads, this achieves a 2 times speedup, because later iterations often 
have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop

2015-08-18 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2540:
---
Component/s: Core

 LocalBufferPool.requestBuffer gets into infinite loop
 -

 Key: FLINK-2540
 URL: https://issues.apache.org/jira/browse/FLINK-2540
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Gabor Gevay

 I'm trying to run a complicated computation that looks like this: [1].
 One of the DataSource-Filter-Map chains finishes fine, but the other one 
 freezes. Debugging shows that it is spinning in the while loop in 
 LocalBufferPool.requestBuffer.
 askToRecycle is false. Both numberOfRequestedMemorySegments and 
 currentPoolSize is 128, so it never goes into that if either.
 This is a stack trace: [2]
 And here is the code, if you would like to run it: [3]. Unfortunately, I 
 can't make it more minimal, becuase if I remove some operators, the problem 
 disappears. The class to start is malom.Solver. (On first run, it calculates 
 some lookuptables for a few minutes, and puts them into /tmp/movegen)
 [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt
 [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt
 [3] https://github.com/ggevay/flink/tree/deadlock-malom



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop

2015-08-18 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2540:
--

 Summary: LocalBufferPool.requestBuffer gets into infinite loop
 Key: FLINK-2540
 URL: https://issues.apache.org/jira/browse/FLINK-2540
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay


I'm trying to run a complicated computation that looks like this: [1].
One of the DataSource-Filter-Map chains finishes fine, but the other one 
freezes. Debugging shows that it is spinning in the while loop in 
LocalBufferPool.requestBuffer.

askToRecycle is false. Both numberOfRequestedMemorySegments and currentPoolSize 
is 128, so it never goes into that if either.

This is a stack trace: [2]

And here is the code, if you would like to run it: [3]. Unfortunately, I can't 
make it more minimal, becuase if I remove some operators, the problem 
disappears. The class to start is malom.Solver. (On first run, it calculates 
some lookuptables for a few minutes, and puts them into /tmp/movegen)

[1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt
[2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt
[3] https://github.com/ggevay/flink/tree/deadlock-malom



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop

2015-08-18 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2540:
---
Fix Version/s: 0.9.1
   0.10

 LocalBufferPool.requestBuffer gets into infinite loop
 -

 Key: FLINK-2540
 URL: https://issues.apache.org/jira/browse/FLINK-2540
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Gabor Gevay
 Fix For: 0.10, 0.9.1


 I'm trying to run a complicated computation that looks like this: [1].
 One of the DataSource-Filter-Map chains finishes fine, but the other one 
 freezes. Debugging shows that it is spinning in the while loop in 
 LocalBufferPool.requestBuffer.
 askToRecycle is false. Both numberOfRequestedMemorySegments and 
 currentPoolSize is 128, so it never goes into that if either.
 This is a stack trace: [2]
 And here is the code, if you would like to run it: [3]. Unfortunately, I 
 can't make it more minimal, becuase if I remove some operators, the problem 
 disappears. The class to start is malom.Solver. (On first run, it calculates 
 some lookuptables for a few minutes, and puts them into /tmp/movegen)
 [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt
 [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt
 [3] https://github.com/ggevay/flink/tree/deadlock-malom



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2542) It should be documented that it is required from a join key to override hashCode(), when it is not a POJO

2015-08-18 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2542:
--

 Summary: It should be documented that it is required from a join 
key to override hashCode(), when it is not a POJO
 Key: FLINK-2542
 URL: https://issues.apache.org/jira/browse/FLINK-2542
 Project: Flink
  Issue Type: Bug
  Components: Gelly, Java API
Reporter: Gabor Gevay
Priority: Minor
 Fix For: 0.10, 0.9.1


If the join key is not a POJO, and does not override hashCode, then the join 
silently fails (produces empty output). I don't see this documented anywhere.

The Gelly documentation should also have this info separately, because it does 
joins internally on the vertex IDs, but the user might not know this, or might 
not look at the join documentation when using Gelly.

Here is an example code:

{noformat}
public static class ID implements ComparableID {
public long foo;

//no default ctor -- not a POJO

public ID(long foo) {
this.foo = foo;
}

@Override
public int compareTo(ID o) {
return ((Long)foo).compareTo(o.foo);
}

@Override
public boolean equals(Object o0) {
if(o0 instanceof ID) {
ID o = (ID)o0;
return foo == o.foo;
} else {
return false;
}
}

@Override
public int hashCode() {
return 42;
}
}


public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

DataSetTuple2ID, Long inDegrees = env.fromElements(Tuple2.of(new 
ID(123l), 4l));
DataSetTuple2ID, Long outDegrees = env.fromElements(Tuple2.of(new 
ID(123l), 5l));

DataSetTuple3ID, Long, Long degrees = inDegrees.join(outDegrees, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
.with(new FlatJoinFunctionTuple2ID, Long, Tuple2ID, 
Long, Tuple3ID, Long, Long() {
@Override
public void join(Tuple2ID, Long first, 
Tuple2ID, Long second, CollectorTuple3ID, Long, Long out) {
out.collect(new Tuple3ID, Long, 
Long(first.f0, first.f1, second.f1));
}

}).withForwardedFieldsFirst(f0;f1).withForwardedFieldsSecond(f1);

System.out.println(degrees count:  + degrees.count());
}
{noformat}


This prints 1, but if I comment out the hashCode, it prints 0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-15 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2527:
--

 Summary: If a VertexUpdateFunction calls setNewVertexValue more 
than once, the MessagingFunction will only see the first value set
 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


The problem is that if setNewVertexValue is called more than once, it sends 
each new value to the out Collector, and these all end up in the workset, but 
then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only 
the first value in the state Iterable. I see three ways to resolve this:

1. Add it to the documentation that setNewVertexValue should only be called 
once, and optionally add a check for this.
2. In setNewVertexValue, do not send the newValue to the out Collector at once, 
but only record it in outVal, and send the last recorded value after 
updateVertex returns.
3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and 
MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some 
documentation addition.)

I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-15 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698423#comment-14698423
 ] 

Gabor Gevay commented on FLINK-2527:


I'm also leaning towards (1) now. I have actually implemented (2) in the 
meantime, but then I realized that it sets a subtle trap for the user, that I 
immediately fell into :)

In my user function, I have a loop over the msgs, and for each msg I decide to 
set some new vertex value or not. This loop might set a new value multiple 
times, and the last one should be retained.

At first, I liked (2) better, because if we have (1), then I essentially have 
to implement (2) inside my user function anyway. I thought that this situation 
is probably a common one, so why have everyone reimplement (2) inside their 
user functions, if we can do it in Gelly? However, the trap is that my code 
inside the loop implicitly assumed that the setNewVertexValue function updates 
the vertex variable (the first parameter of the UDF), but it does not. Of 
course, we could make the setNewVertexValue do this update, but this is getting 
complicated. So it is probably just best to go with (1), to keep the API nice 
and simple.

 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2447:
--

 Summary: TypeExtractor returns wrong type info when a Tuple has 
two fields of the same POJO type
 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay


Consider the following code:

DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
@Override
public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]

But it prints the following instead:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
GenericTypeFooBarPojo

Note, that this problem causes some co-groups in Gelly to crash with 
org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the alreadySeen field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy
starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it 
calls itself for the first field, which proceeds into the privateGetForClass 
case which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2447:
---
Description: 
Consider the following code:

DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
@Override
public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]

But it prints the following instead:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
GenericTypeFooBarPojo

Note, that this problem causes some co-groups in Gelly to crash with 
org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the alreadySeen field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy starts to 
process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls 
itself for the first field, which proceeds into the privateGetForClass case 
which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)

  was:
Consider the following code:

DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
@Override
public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]

But it prints the following instead:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
GenericTypeFooBarPojo

Note, that this problem causes some co-groups in Gelly to crash with 
org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the alreadySeen field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy
starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it 
calls itself for the first field, which proceeds into the privateGetForClass 
case which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)


 TypeExtractor returns wrong type info when a Tuple has two fields of the same 
 POJO type
 

[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2447:
---
Component/s: Java API

 TypeExtractor returns wrong type info when a Tuple has two fields of the same 
 POJO type
 ---

 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
  Components: Java API
Reporter: Gabor Gevay

 Consider the following code:
 DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
   DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
 MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
   @Override
   public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
 value) throws Exception {
   return null;
   }
   });
 where FooBarPojo is the following type:
 public class FooBarPojo {
   public int foo, bar;
   public FooBarPojo() {}
 }
 This should print a tuple type with two identical fields:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]
 But it prints the following instead:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 GenericTypeFooBarPojo
 Note, that this problem causes some co-groups in Gelly to crash with 
 org.apache.flink.api.common.InvalidProgramException: The pair of co-group 
 keys are not compatible with each other when the vertex ID type is a POJO, 
 because the second field of the Edge type gets to be a generic type, but the 
 POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns 
 different numbers for the POJO and the generic type.
 The source of the problem is the mechanism in TypeExtractor that would detect 
 recursive types (see the alreadySeen field in TypeExtractor), as it 
 mistakes the second appearance of FooBarPojo with a recursive field.
 Specifically the following happens: createTypeInfoWithTypeHierarchy
 starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it 
 calls itself for the first field, which proceeds into the privateGetForClass 
 case which correctly detects that it is a POJO, and correctly returns a 
 PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
 PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
 approaches the second field, goes into privateGetForClass, which mistakenly 
 returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type 
 is being processed.
 (Note, that if we comment out the recursive type detection (the lines that do 
 their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2447:
---
Fix Version/s: 0.9.1
   0.10

 TypeExtractor returns wrong type info when a Tuple has two fields of the same 
 POJO type
 ---

 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
  Components: Java API
Reporter: Gabor Gevay
 Fix For: 0.10, 0.9.1


 Consider the following code:
 DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
   DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
 MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
   @Override
   public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
 value) throws Exception {
   return null;
   }
   });
 where FooBarPojo is the following type:
 public class FooBarPojo {
   public int foo, bar;
   public FooBarPojo() {}
 }
 This should print a tuple type with two identical fields:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]
 But it prints the following instead:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 GenericTypeFooBarPojo
 Note, that this problem causes some co-groups in Gelly to crash with 
 org.apache.flink.api.common.InvalidProgramException: The pair of co-group 
 keys are not compatible with each other when the vertex ID type is a POJO, 
 because the second field of the Edge type gets to be a generic type, but the 
 POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns 
 different numbers for the POJO and the generic type.
 The source of the problem is the mechanism in TypeExtractor that would detect 
 recursive types (see the alreadySeen field in TypeExtractor), as it 
 mistakes the second appearance of FooBarPojo with a recursive field.
 Specifically the following happens: createTypeInfoWithTypeHierarchy starts to 
 process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls 
 itself for the first field, which proceeds into the privateGetForClass case 
 which correctly detects that it is a POJO, and correctly returns a 
 PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
 PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
 approaches the second field, goes into privateGetForClass, which mistakenly 
 returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type 
 is being processed.
 (Note, that if we comment out the recursive type detection (the lines that do 
 their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2360) EOFException

2015-07-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2360:
---
Fix Version/s: 0.9.1
   0.10

 EOFException
 

 Key: FLINK-2360
 URL: https://issues.apache.org/jira/browse/FLINK-2360
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Andra Lungu
Priority: Critical
 Fix For: 0.10, 0.9.1


 The following code:
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/NodeSplittingConnectedComponents.java
 What the code does, on a very high level:
 1). Discovers the skewed nodes in a graph and splits them into subnodes, 
 recursively, in levels until we achieve a more uniform degree distribution.
 2). Creates a delta iteration that takes the split data set as a solution 
 set. On this, it runs the Connected Components Algorithm. At the end of each 
 superstep, the partial results computed by the subvertices is gathered back 
 into the initial vertex, updating the overall value in the split vertices.
 3). Once the iteration converged, the graph is brought back to its initial 
 state.
 Ran on the twitter follower graph: 
 http://twitter.mpi-sws.org/data-icwsm2010.html
 With a similar configuration to the one in FLINK-2293. 
 Fails with: 
 Caused by: java.io.EOFException
   at 
 org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
   at 
 org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
   at 
 org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
   at 
 org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
   at org.apache.flink.types.StringValue.writeString(StringValue.java:796)
   at 
 org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:63)
   at 
 org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
   at 
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
   at 
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
   at 
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
   at 
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
   at 
 org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
   at 
 org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
   at 
 org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
   at 
 org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
   at 
 org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:722)
 Job Manager log:
 https://gist.github.com/andralungu/9fc100603ba8d4b8d686



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2437) TypeExtractor.analyzePojo has some problems around the default constructor detection

2015-07-30 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2437:
--

 Summary: TypeExtractor.analyzePojo has some problems around the 
default constructor detection
 Key: FLINK-2437
 URL: https://issues.apache.org/jira/browse/FLINK-2437
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


If a class does have a default constructor, but the user forgot to make it 
public, then TypeExtractor.analyzePojo still thinks everything is OK, so it 
creates a PojoTypeInfo. Then PojoSerializer.createInstance blows up.

Furthermore, a return null seems to be missing from the then case of the if 
after catching the NoSuchMethodException which would also cause a headache for 
PojoSerializer.

An additional minor issue is that the word class is printed twice in several 
places, because class.toString also prepends it to the class name.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2437) TypeExtractor.analyzePojo has some problems around the default constructor detection

2015-07-30 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647915#comment-14647915
 ] 

Gabor Gevay commented on FLINK-2437:


I just realized that it is not a problem that analyzePojo lets abstract classes 
through, so the return null is not missing, sorry.

 TypeExtractor.analyzePojo has some problems around the default constructor 
 detection
 

 Key: FLINK-2437
 URL: https://issues.apache.org/jira/browse/FLINK-2437
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor

 If a class does have a default constructor, but the user forgot to make it 
 public, then TypeExtractor.analyzePojo still thinks everything is OK, so it 
 creates a PojoTypeInfo. Then PojoSerializer.createInstance blows up.
 Furthermore, a return null seems to be missing from the then case of the if 
 after catching the NoSuchMethodException which would also cause a headache 
 for PojoSerializer.
 An additional minor issue is that the word class is printed twice in 
 several places, because class.toString also prepends it to the class name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633525#comment-14633525
 ] 

Gabor Gevay commented on FLINK-2377:


Yes, I have found the problem: TestBaseUtils.readAllResultLines is not closing 
the readers. I will open a PR that fixes this.

 AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
 --

 Key: FLINK-2377
 URL: https://issues.apache.org/jira/browse/FLINK-2377
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows
Reporter: Gabor Gevay
Priority: Minor

 This is probably another file closing issue. (that is, Windows won't delete 
 open files, as opposed to Linux)
 I have encountered two concrete tests so far where this actually appears: 
 CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2374:
--

 Summary: File.setWritable doesn't work on Windows, which makes 
several tests fail
 Key: FLINK-2374
 URL: https://issues.apache.org/jira/browse/FLINK-2374
 Project: Flink
  Issue Type: Bug
 Environment: Windows
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Trivial


The tests that use setWritable to test the handling of certain error conditions 
should be skipped in case we are under Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail

2015-07-17 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631270#comment-14631270
 ] 

Gabor Gevay commented on FLINK-2374:


Yes, it appears to be a duplicate, sorry.

 File.setWritable doesn't work on Windows, which makes several tests fail
 

 Key: FLINK-2374
 URL: https://issues.apache.org/jira/browse/FLINK-2374
 Project: Flink
  Issue Type: Bug
 Environment: Windows
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Trivial

 The tests that use setWritable to test the handling of certain error 
 conditions should be skipped in case we are under Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail

2015-07-17 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay closed FLINK-2374.
--
Resolution: Duplicate

 File.setWritable doesn't work on Windows, which makes several tests fail
 

 Key: FLINK-2374
 URL: https://issues.apache.org/jira/browse/FLINK-2374
 Project: Flink
  Issue Type: Bug
 Environment: Windows
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Trivial

 The tests that use setWritable to test the handling of certain error 
 conditions should be skipped in case we are under Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2359) Add factory methods to the Java TupleX types

2015-07-17 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631386#comment-14631386
 ] 

Gabor Gevay commented on FLINK-2359:


You're welcome!

 Add factory methods to the Java TupleX types
 

 Key: FLINK-2359
 URL: https://issues.apache.org/jira/browse/FLINK-2359
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 0.10


 The compiler doesn't infer generic type arguments from constructor arguments, 
 which means that we have to call Tuple constructors like this:
 Tuple2Integer, String = new Tuple2Integer, String(5, foo);
 I propose adding a factory method, which would provide the following 
 alternative:
 Tuple2Integer, String = Tuple2.create(5, foo);
 (Note that C++ and C# Tuples also have similar factory methods for the same 
 reason.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2376) testFindConnectableAddress sometimes fails on Windows because of the time limit

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2376:
--

 Summary: testFindConnectableAddress sometimes fails on Windows 
because of the time limit
 Key: FLINK-2376
 URL: https://issues.apache.org/jira/browse/FLINK-2376
 Project: Flink
  Issue Type: Bug
 Environment: Windows
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2377:
--

 Summary: AbstractTestBase.deleteAllTempFiles sometimes fails on 
Windows
 Key: FLINK-2377
 URL: https://issues.apache.org/jira/browse/FLINK-2377
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows
Reporter: Gabor Gevay
Priority: Minor


This is probably another file closing issue. (that is, Windows won't delete 
open files, as opposed to Linux)

I have encountered two concrete tests so far where this actually appears: 
CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2378) complexIntegrationTest1 and testGroupByFeedback sometimes fail on Windows

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2378:
--

 Summary: complexIntegrationTest1 and testGroupByFeedback sometimes 
fail on Windows
 Key: FLINK-2378
 URL: https://issues.apache.org/jira/browse/FLINK-2378
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gabor Gevay
Priority: Minor


This only happens when Maven runs the test, I can't trigger it from the IDE.

At first I thought that something is wrong with having shared variables between 
the tests in ComplexIntegrationTest, but when I eliminated the shared 
variables, the test still fails.

The error msgs:

testGroupByFeedback(org.apache.flink.streaming.api.IterateTest)
Time elapsed: 12.091 sec   ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:169)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.AssertionError: null
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertTrue(Assert.java:52)
at 
org.apache.flink.streaming.api.IterateTest$6.close(IterateTest.java:447)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeOperator(StreamTask.java:182)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:112)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.lang.Thread.run(Thread.java:745)



complexIntegrationTest1(org.apache.flink.streaming.api.complex.ComplexIntegrationTest)
 Time elapsed: 15.989 sec   FAILURE!
java.lang.AssertionError: Different number of lines in expected and
obtained result. expected:9 but was:5
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:272)
at 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:258)
at 
org.apache.flink.streaming.api.complex.ComplexIntegrationTest.after(ComplexIntegrationTest.java:91)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2369) On Windows, in testFailingSortingDataSinkTask the temp file is not removed

2015-07-16 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2369:
--

 Summary: On Windows, in testFailingSortingDataSinkTask the temp 
file is not removed
 Key: FLINK-2369
 URL: https://issues.apache.org/jira/browse/FLINK-2369
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
 Environment: Windows 7 64-bit, JDK 8u51
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The test fails with the assert at the very end (Temp output file has not been 
removed). This happens because FileOutputFormat.tryCleanupOnError can't delete 
the file, because it is still open (note, that Linux happily deletes open 
files).

A fix would be to have the  this.format.close();  not just in the finally block 
(in DataSinkTask.invoke), but also before the tryCleanupOnError call (around 
line 217).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2359) Add factory methods to the Java TupleX types

2015-07-14 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626676#comment-14626676
 ] 

Gabor Gevay commented on FLINK-2359:


Oh, I didn't know about the diamond thing, thanks!

 Add factory methods to the Java TupleX types
 

 Key: FLINK-2359
 URL: https://issues.apache.org/jira/browse/FLINK-2359
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor

 The compiler doesn't infer generic type arguments from constructor arguments, 
 which means that we have to call Tuple constructors like this:
 Tuple2Integer, String = new Tuple2Integer, String(5, foo);
 I propose adding a factory method, which would provide the following 
 alternative:
 Tuple2Integer, String = Tuple2.create(5, foo);
 (Note that C++ and C# Tuples also have similar factory methods for the same 
 reason.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2359) Add factory methods to the Java TupleX types

2015-07-14 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2359:
--

 Summary: Add factory methods to the Java TupleX types
 Key: FLINK-2359
 URL: https://issues.apache.org/jira/browse/FLINK-2359
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The compiler doesn't infer generic type arguments from constructor arguments, 
which means that we have to call Tuple constructors like this:

Tuple2Integer, String = new Tuple2Integer, String(5, foo);

I propose adding a factory method, which would provide the following 
alternative:

Tuple2Integer, String = Tuple2.create(5, foo);

(Note that C++ and C# Tuples also have similar factory methods for the same 
reason.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2148) Approximately calculate the number of distinct elements of a stream

2015-07-13 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-2148:
--

Assignee: Gabor Gevay

 Approximately calculate the number of distinct elements of a stream
 ---

 Key: FLINK-2148
 URL: https://issues.apache.org/jira/browse/FLINK-2148
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
  Labels: statistics

 In the paper
 http://people.seas.harvard.edu/~minilek/papers/f0.pdf
 Kane et al. describes an optimal algorithm for estimating the number of 
 distinct elements in a data stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2255) In the TopSpeedWindowing examples, every window contains only 1 element, because event time is in millisec, but eviction is in sec

2015-06-21 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2255:
--

 Summary: In the TopSpeedWindowing examples, every window contains 
only 1 element, because event time is in millisec, but eviction is in sec
 Key: FLINK-2255
 URL: https://issues.apache.org/jira/browse/FLINK-2255
 Project: Flink
  Issue Type: Bug
  Components: Examples, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The event times are generated by System.currentTimeMillis(), so evictionSec 
should be multiplied by 1000, when passing it to Time.of.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2181) SessionWindowing example has a memleak

2015-06-08 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2181:
--

 Summary: SessionWindowing example has a memleak
 Key: FLINK-2181
 URL: https://issues.apache.org/jira/browse/FLINK-2181
 Project: Flink
  Issue Type: Bug
  Components: Examples, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay


The trigger policy objects belonging to already terminated sessions are kept in 
memory, and also NotifyOnLastGlobalElement gets called on them. This causes the 
program to eat up more and more memory, and also to get slower with time.

Mailing list discussion:
http://mail-archives.apache.org/mod_mbox/flink-dev/201505.mbox/%3CCADXjeyBW9uaA=ayde+9r+qh85pblyavuegsr7h8pwkyntm9...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2147) Approximate calculation of frequencies in data streams

2015-06-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2147:
---
Labels: statistics  (was: )

 Approximate calculation of frequencies in data streams
 --

 Key: FLINK-2147
 URL: https://issues.apache.org/jira/browse/FLINK-2147
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Priority: Minor
  Labels: statistics

 Count-Min sketch is a hashing-based algorithm for approximately keeping track 
 of the frequencies of elements in a data stream. It is described by Cormode 
 et al. in the following paper:
 http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
 Note that this algorithm can be conveniently implemented in a distributed 
 way, as described in section 3.2 of the paper.
 The paper
 http://www.vldb.org/conf/2002/S10P03.pdf
 also describes algorithms for approximately keeping track of frequencies, but 
 here the user can specify a threshold below which she is not interested in 
 the frequency of an element. The error-bounds are also different than the 
 Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2148) Approximately calculate the number of distinct elements of a stream

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2148:
--

 Summary: Approximately calculate the number of distinct elements 
of a stream
 Key: FLINK-2148
 URL: https://issues.apache.org/jira/browse/FLINK-2148
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Priority: Minor


In the paper
http://people.seas.harvard.edu/~minilek/papers/f0.pdf
Kane et al. describes an optimal algorithm for estimating the number of 
distinct elements in a data stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2145) Median calculation for windows

2015-06-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2145:
---
Labels: statistics  (was: )

 Median calculation for windows
 --

 Key: FLINK-2145
 URL: https://issues.apache.org/jira/browse/FLINK-2145
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
  Labels: statistics

 The PreReducer for this has the following algorithm: We maintain two 
 multisets (as, for example, balanced binary search trees), that always 
 partition the elements of the current window to smaller-than-median and 
 larger-than-median elements. At each store and evict, we can maintain this 
 invariant with only O(1) multiset operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2145) Median calculation for windows

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2145:
--

 Summary: Median calculation for windows
 Key: FLINK-2145
 URL: https://issues.apache.org/jira/browse/FLINK-2145
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The PreReducer for this has the following algorithm: We maintain two multisets 
(as, for example, balanced binary search trees), that always partition the 
elements of the current window to smaller-than-median and larger-than-median 
elements. At each store and evict, we can maintain this invariant with only 
O(1) multiset operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2144) Implement count, average, and variance for windows

2015-06-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2144:
---
Labels: statistics  (was: )

 Implement count, average, and variance for windows
 --

 Key: FLINK-2144
 URL: https://issues.apache.org/jira/browse/FLINK-2144
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
  Labels: statistics

 By count I mean the number of elements in the window.
 These can be implemented very efficiently building on FLINK-2143.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2144) Implement count, average, and variance for windows

2015-06-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2144:
---
Description: 
By count I mean the number of elements in the window.

These can be implemented very efficiently building on FLINK-2143:
Store: O(1)
Evict: O(1)
emitWindow: O(1)
memory: O(1)

  was:
By count I mean the number of elements in the window.

These can be implemented very efficiently building on FLINK-2143.


 Implement count, average, and variance for windows
 --

 Key: FLINK-2144
 URL: https://issues.apache.org/jira/browse/FLINK-2144
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
  Labels: statistics

 By count I mean the number of elements in the window.
 These can be implemented very efficiently building on FLINK-2143:
 Store: O(1)
 Evict: O(1)
 emitWindow: O(1)
 memory: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2146) Fast calculation of min/max with arbitrary eviction and triggers

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2146:
--

 Summary: Fast calculation of min/max with arbitrary eviction and 
triggers
 Key: FLINK-2146
 URL: https://issues.apache.org/jira/browse/FLINK-2146
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Priority: Minor


The last algorithm described here could be used:
http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
It is based on a double-ended queue which maintains a sorted list of elements 
of the current window that have the possibility of being the maximal element in 
the future.

Store: O(1) amortized
Evict: O(1)
emitWindow: O(1)
memory: O(N)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2143:
--

 Summary: Add an overload to reduceWindow which takes the inverse 
of the reduceFunction as a second parameter
 Key: FLINK-2143
 URL: https://issues.apache.org/jira/browse/FLINK-2143
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Assignee: Gabor Gevay


If the inverse of the reduceFunction is also available (for example subtraction 
when summing numbers), then a PreReducer can maintain the aggregate in O(1) 
memory and O(1) time for evict, store, and emitWindow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2142) GSoC project: Exact and Approximate Statistics for Data Streams and Windows

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2142:
--

 Summary: GSoC project: Exact and Approximate Statistics for Data 
Streams and Windows
 Key: FLINK-2142
 URL: https://issues.apache.org/jira/browse/FLINK-2142
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The goal of this project is to implement basic statistics of data streams and 
windows (like average, median, variance, correlation, etc.) in a 
computationally efficient manner. This involves designing custom preaggregators.

The exact calculation of some statistics (eg. frequencies, or the number of 
distinct elements) would require memory proportional to the number of elements 
in the input (the window or the entire stream). However, there are efficient 
algorithms and data structures using less memory for calculating the same 
statistics only approximately, with user-specified error bounds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2   3   >