[jira] [Created] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo

2018-02-13 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-8649:
--

 Summary: Scala StreamExecutionEnvironment.createInput should pass 
on the TypeInfo
 Key: FLINK-8649
 URL: https://issues.apache.org/jira/browse/FLINK-8649
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Affects Versions: 1.4.0
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 1.5.0


This is {{StreamExecutionEnvironment.createInput}} in the Scala API:
{code}
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
DataStream[T] =
  asScalaStream(javaEnv.createInput(inputFormat))
{code}
It should pass on the implicitly got {{TypeInformation}} to Java like this:
{code}
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
DataStream[T] =
  asScalaStream(javaEnv.createInput(inputFormat, 
implicitly[TypeInformation[T]]))
{code}
The current situation creates a problem, for example, when we have generics in 
the type like in the following code, where the Java API can't deduce the 
{{TypeInformation}} on its own:
{code}
 StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, 
Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null))
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-8117:
--

 Summary: Eliminate modulo operation from RoundRobinChannelSelector 
and RebalancePartitioner
 Key: FLINK-8117
 URL: https://issues.apache.org/jira/browse/FLINK-8117
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.5.0


Both {{RoundRobinChannelSelector}} and {{RebalancePartitioner}} use a modulo 
operation to wrap around when the current channel counter reaches the number of 
channels. Using an {{if}} would have better performance.

A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
most of the time predict the condition to be false.

\[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-11-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-8116:
--

 Summary: Stale comments referring to Checkpointed interface
 Key: FLINK-8116
 URL: https://issues.apache.org/jira/browse/FLINK-8116
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, Documentation
Reporter: Gabor Gevay
Priority: Trivial
 Fix For: 1.4.0, 1.5.0


Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by the 
{{CheckpointedFunction}} interface.

However, in {{SourceFunction}} there are two comments still referring to the 
old {{Checkpointed}} interface. (The code examples there also need to be 
modified.)

Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7702) Javadocs link broken

2017-09-27 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7702:
--

 Summary: Javadocs link broken
 Key: FLINK-7702
 URL: https://issues.apache.org/jira/browse/FLINK-7702
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Gabor Gevay
Priority: Minor


The "Javadocs" link in the left side menu of this page doesn't work:
https://ci.apache.org/projects/flink/flink-docs-master/

Note that it works in 1.3:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7685) CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly"

2017-09-25 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7685:
--

 Summary: CompilerException: "Bug: Logic for branching plans 
(non-tree plans) has an error, and does not track the re-joining of branches 
correctly"
 Key: FLINK-7685
 URL: https://issues.apache.org/jira/browse/FLINK-7685
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Reporter: Gabor Gevay
Priority: Minor


A Flink program which reads an input DataSet, creates 64 new DataSets from it, 
and writes these to separate files throws the following exception:

{code:java}
Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
Logic for branching plans (non-tree plans) has an error, and does not track the 
re-joining of branches correctly.
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:491)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:921)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86)
{code}

Here is some code that reproduces it:
https://github.com/ggevay/flink/tree/compiler-exception-new

Note that it works with less than 64 DataSets.

Also note that with more than 64 DataSets it throws {{CompilerException: Cannot 
currently handle nodes with more than 64 outputs}}, which is at least a clear 
error msg that helps the user to find a workaround.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7680) Add "Performance Tuning" section to docs

2017-09-24 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7680:
--

 Summary: Add "Performance Tuning" section to docs
 Key: FLINK-7680
 URL: https://issues.apache.org/jira/browse/FLINK-7680
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Gabor Gevay
Priority: Minor
 Fix For: 1.4.0


We could have a separate section in the docs about performance tuning (maybe 
separately for batch and streaming jobs).

It could include for example:
- object reuse
- serializer issues
- semantic annotations
- optimizer hints
- sorter code generation (Flink-5734)

See [~fhueske]'s suggestion here:
https://github.com/apache/flink/pull/3511#discussion_r139917275



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7629) Scala stream aggregations should support nested field expressions

2017-09-15 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7629:
--

 Summary: Scala stream aggregations should support nested field 
expressions
 Key: FLINK-7629
 URL: https://issues.apache.org/jira/browse/FLINK-7629
 Project: Flink
  Issue Type: Bug
  Components: Scala API, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.4.0


In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only work 
with a field name, and not with nested field expressions, such as 
"fieldA.fieldX". (This contradicts their documentation.)

The reason for this is that the string overload of {{KeyedStream.aggregate}} 
uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it 
should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as the 
integer overload does (and as the Java API does). The ctors of 
{{SumAggregator}} or {{ComparableAggregator}} will call 
{{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested 
field expression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4868:
--

 Summary: Insertion sort could avoid the swaps
 Key: FLINK-4868
 URL: https://issues.apache.org/jira/browse/FLINK-4868
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4867:
--

 Summary: Investigate code generation for improving sort performance
 Key: FLINK-4867
 URL: https://issues.apache.org/jira/browse/FLINK-4867
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This issue is for investigating whether code generation could speed up sorting. 
We should make some performance measurements on hand-written code that is 
similar to what we could generate, to see whether investing more time into this 
is worth it. If we find that it is worth it, we can open a second Jira for the 
actual implementation of the code generation.

I think we could generate one class at places where we currently instantiate 
{{QuickSort}}. This generated class would include the functionality of 
{{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
{{MemorySegment.compare}}, and {{MemorySegment.swap}}.

Btw. I'm planning to give this as a student project at a TU Berlin course in 
the next few months.

Some concrete ideas about how could a generated sorter be faster than the 
current sorting code:
* {{MemorySegment.compare}} could be specialized for
** Length: for small records, the loop could be unrolled
** Endiannes (currently it is optimized for big endian; and in the little 
endian case (e.g. x86) it does a Long.reverseBytes for each long read)
* {{MemorySegment.swapBytes}}
** In case of small records, using three {{UNSAFE.copyMemory}} is probably not 
as efficient as a specialized swap, because
*** We could use total loop unrolling in generated code (because we know the 
exact record size)
*** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
*** We will only need 2/3 the memory bandwidth, because the temporary storage 
could be a register if we swap one byte (or one {{long}}) at a time
** several checks might be eliminated
* Better inlining behaviour could be achieved 
** Virtual function calls to the methods of {{InMemorySorter}} could be 
eliminated. (Note, that these are problematic to devirtualize by the JVM if 
there are different derived classes used in a single Flink job (see \[8,7\]).)
** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
excessive checks make it too large
** {{MemorySegment.compare}} is probably also not inlined currently, because 
those two while loops are too large

\[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
long, Object, long, long)
\[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
\[8\] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
\[9\] 
http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4578) AggregateOperator incorrectly sets ForwardedField with nested composite types

2016-09-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4578:
--

 Summary: AggregateOperator incorrectly sets ForwardedField with 
nested composite types
 Key: FLINK-4578
 URL: https://issues.apache.org/jira/browse/FLINK-4578
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Reporter: Gabor Gevay


When an aggregation is called on a grouped DataSet, 
{{AggregateOperator.translateToDataFlow}} tries to determine whether the field 
that is being aggregated is the same field that the grouping is based on. If 
this is not the case, then it adds the ForwardedField property for the key 
field.

However, the mechanism that makes this decision breaks when there are nested 
composite types involved, because it gets the key positions with 
{{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ positions, 
whereas the position of the field to aggregate is counted only on the outer 
type.

Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields
Here, I have changed the WordCount example to have the type 
{{Tuple3, String, Integer>}}, and do {{.groupBy(1).sum(2)}} 
(which groups by the String field and sums the Integer field). If you set a 
breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that 
{{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which 
causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is 
caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, but 
only 1 field in {{fields}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4575) DataSet aggregate methods should support POJOs

2016-09-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4575:
--

 Summary: DataSet aggregate methods should support POJOs
 Key: FLINK-4575
 URL: https://issues.apache.org/jira/browse/FLINK-4575
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API
Reporter: Gabor Gevay
Priority: Minor


The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
support Tuples, with the fields specified by indices. With 
https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
POJOs and field expressions would be easy: {{AggregateOperator}} would create 
{{FieldAccessors}} instead of just storing field positions, and 
{{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} instead 
of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-05-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3999:
--

 Summary: Rename the `running` flag in the drivers to `canceled`
 Key: FLINK-3999
 URL: https://issues.apache.org/jira/browse/FLINK-3999
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Trivial


The name of the {{running}} flag in the drivers doesn't reflect its usage: when 
the operator just stops normally, then it is not running anymore, but the 
{{running}}  flag will still be true, since the {{running}} flag is only set 
when cancelling.

It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-04-09 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3722:
--

 Summary: The divisions in the InMemorySorters' swap/compare 
methods hurt performance
 Key: FLINK-3722
 URL: https://issues.apache.org/jira/browse/FLINK-3722
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Gabor Gevay
Priority: Minor


NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
use divisions (which take a lot of time \[1\]) to calculate the index of the 
MemorySegment and the offset inside the segment. [~greghogan] reported on the 
mailing list \[2\] measuring a ~12-14% performance effect in one case.

A possibility to improve the situation is the following:
The way that QuickSort mostly uses these compare and swap methods is that it 
maintains two indices, and uses them to call compare and swap. The key 
observation is that these indices are mostly stepped by one, and 
_incrementally_ calculating the quotient and modulo is actually easy when the 
index changes only by one: increment/decrement the modulo, and check whether 
the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
modulo and increment/decrement the quotient.

To implement this, InMemorySorter would have to expose an iterator that would 
have the divisor and the current modulo and quotient as state, and have 
increment/decrement methods. Compare and swap could then have overloads that 
take these iterators as arguments.

\[1\] http://www.agner.org/optimize/instruction_tables.pdf
\[2\] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant

2016-02-26 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3519:
--

 Summary: Subclasses of Tuples don't work if the declared type of a 
DataSet is not the descendant
 Key: FLINK-3519
 URL: https://issues.apache.org/jira/browse/FLINK-3519
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Gabor Gevay
Priority: Minor


If I have a subclass of TupleN, then objects of this type will turn into 
TupleNs when I try to use them in a DataSet.

For example, if I have a class like this:
{code}
public static class Foo extends Tuple1 {
public short a;

public Foo() {}

public Foo(int f0, int a) {
this.f0 = f0;
this.a = (short)a;
}

@Override
public String toString() {
return "(" + f0 + ", " + a + ")";
}
}
{code}
And then I do this:
{code}
env.fromElements(0,0,0).map(new MapFunction>() {
@Override
public Tuple1 map(Integer value) throws Exception {
return new Foo(5, 6);
}
}).print();
{code}
Then I don't have Foos in the output, but only Tuples:
{code}
(5)
(5)
(5)
{code}

The problem is caused by the TupleSerializer not caring about subclasses at 
all. I guess the reason for this is performance: we don't want to deal with 
writing and reading subclass tags when we have Tuples.

I see three options for solving this:
1. Add subclass tags to the TupleSerializer: This is not really an option, 
because we don't want to loose performance.
2. Document this behavior in the javadoc of the Tuple classes.
3. Make the Tuple types final: this would be the clean solution, but it is API 
breaking, and the first victim would be Gelly: the Vertex and Edge types extend 
from tuples. (Note that the issue doesn't appear there, because the DataSets 
there always have the type of the descendant class.)

When deciding between 2. and 3., an important point to note is that if you have 
your class extend from a Tuple type instead of just adding the f0, f1, ... 
fields manually in the hopes of getting the performance boost associated with 
Tuples, then you are out of luck: the PojoSerializer will kick in anyway when 
the declared types of your DataSets are the descendant type.

If someone knows about a good reason to extend from a Tuple class, then please 
comment.

For 2., this is a suggested wording for the javadoc of the Tuple classes:
Warning: Please don't subclass Tuple classes, but if you do, then be sure to 
always declare the element type of your DataSets to your descendant type. (That 
is, if you have a "class A extends Tuple2", then don't use instances of A in a 
DataSet, but use DataSet.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)

2016-02-12 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3394:
--

 Summary: Clear up the contract of MutableObjectIterator.next(reuse)
 Key: FLINK-3394
 URL: https://issues.apache.org/jira/browse/FLINK-3394
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Priority: Critical


{{MutableObjectIterator.next(reuse)}} has the following contract (according to 
[~StephanEwen]'s comment \[1\]):

1. The caller may not hold onto {{reuse}} any more
2. The iterator implementor may not hold onto the returned object any more.

This should be documented in its javadoc (with "WARNING" so that people don't 
overlook it).

Additionally, since this was a "secret contract" up to now, all the 270 usages 
of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A 
few that are suspicious at first glance, are in {{CrossDriver}}, 
{{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, 
{{ReusingBuildFirstHashJoinIterator.callWithNextKey}}.

\[1\] 
https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3333) Documentation about object reuse should be improved

2016-02-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-:
--

 Summary: Documentation about object reuse should be improved
 Key: FLINK-
 URL: https://issues.apache.org/jira/browse/FLINK-
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.0.0


The documentation about object reuse \[1\] has several problems, see \[2\].

\[1\] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior

\[2\] 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3322:
--

 Summary: MemoryManager creates too much GC pressure with iterative 
jobs
 Key: FLINK-3322
 URL: https://issues.apache.org/jira/browse/FLINK-3322
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 1.0.0
Reporter: Gabor Gevay


When taskmanager.memory.preallocate is false (the default), released memory 
segments are not added to a pool, but the GC is expected to take care of them. 
This puts too much pressure on the GC with iterative jobs, where the operators 
reallocate all memory at every superstep.

See the following discussion on the mailing list:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html

Reproducing the issue:
https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
The class to start is malom.Solver. If you increase the memory given to the JVM 
from 1 to 50 GB, performance gradually degrades by more than 10 times. (It will 
generate some lookuptables to /tmp on first run for a few minutes.) (I think 
the slowdown might also depend somewhat on taskmanager.memory.fraction, because 
more unused non-managed memory results in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3321) TupleSerializerBase.getLength should know the length when all fields know it

2016-02-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3321:
--

 Summary: TupleSerializerBase.getLength should know the length when 
all fields know it
 Key: FLINK-3321
 URL: https://issues.apache.org/jira/browse/FLINK-3321
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: Gabor Gevay
Priority: Minor


TupleSerializerBase.getLength currently always returns -1, but it could 
actually know the length, when all the field serializers know their lengths 
(since no null can appear anywhere in Tuples, nor can a subclass of Tuple with 
additional fields appear).

(The serializer knowing the exact size has various performance benefits, for 
example see FixedLengthRecordSorter, or 
CompactingHashTable.getInitialTableSize.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-01-26 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3291:
--

 Summary: Object reuse bug in MergeIterator.HeadStream.nextHead
 Key: FLINK-3291
 URL: https://issues.apache.org/jira/browse/FLINK-3291
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Critical


MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
`reuse` object that it got as an argument. This object might be modified later 
by the caller.

This actually happens when ReduceDriver.run calls input.next (which will 
actually be MergeIterator.next(E reuse)) in the inner while loop of the 
objectReuseEnabled branch, and that calls top.nextHead with the reference that 
it got from ReduceDriver, which erroneously saves the reference, and then 
ReduceDriver later uses that same object for doing the reduce.

Another way in which this fails is when MergeIterator.next(E reuse) gives 
`reuse` to different `top`s in different calls, and then the heads end up being 
the same object.

You can observe the latter situation in action by running ReducePerformance 
here:
https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
Set memory to -Xmx200m (so that the MergeIterator actually has merging to do), 
put a breakpoint at the beginning of MergeIterator.next(reuse), and then watch 
`reuse`, and the heads of the first two elements of `this.heap` in the 
debugger. They will get to be the same object after hitting continue about 6 
times.

You can also look at the count that is printed at the end, which shouldn't be 
larger than the key range. Also, if you look into the output file 
/tmp/xxxobjectreusebug, for example the key 77 appears twice.

The good news is that I think I can see an easy fix that doesn't affect 
performance: MergeIterator.HeadStream could have a reuse object of its own as a 
member, and give that to iterator.next in nextHead(E reuse). And then we 
wouldn't need the overload of nextHead that has the reuse parameter, and 
MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted

2016-01-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3266:
--

 Summary: LocalFlinkMiniCluster leaks resources when multiple jobs 
are submitted
 Key: FLINK-3266
 URL: https://issues.apache.org/jira/browse/FLINK-3266
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.0.0
Reporter: Gabor Gevay
Priority: Minor


After a job submitted to a LocalEnvironment finishes, some threads are not 
stopped, and are stuck in waiting forever.

You can observe this, if you enclose the body of the main function of the 
WordCount example with a loop that executes 100 times, and monitor the thread 
count (with VisualVM for example).

(The problem only happens if I use a mini cluster. If I use start-local.sh and 
submit jobs to it, then there is no leak.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds

2015-12-13 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3166:
--

 Summary: The first program in ObjectReuseITCase has the wrong 
expected result, and it succeeds
 Key: FLINK-3166
 URL: https://issues.apache.org/jira/browse/FLINK-3166
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay
Priority: Critical


The first program in ObjectReuseITCase has the following input:
a,1
a,2
a,3
a,4
a,50
There is a groupBy on field 0, and then a reduce, so the result should be 
1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the 
Flink program also produces this.

The problem is caused my mismatched assumptions between 
ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about 
object reuse rules of ReduceFunctions:

ReduceCombineDriver.sortAndCombine has the following comment:
"The user function is expected to return the first input as the result."
While the ReduceFunction in the test is modifying and returning the second 
input. (And the second program in the test also has the same problem.)

I can't find the assumption that is stated in the comment in any documentation. 
For example, the javadoc of ReduceFunction should make the user aware of this. 
Or, alternatively, the code of the driver should be modified to not make this 
assumption. I'm not sure which solution is better.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2839) Failing test: OperatorStatsAccumulatorTest.testAccumulatorAllStatistics

2015-10-09 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2839:
--

 Summary: Failing test: 
OperatorStatsAccumulatorTest.testAccumulatorAllStatistics
 Key: FLINK-2839
 URL: https://issues.apache.org/jira/browse/FLINK-2839
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Reporter: Gabor Gevay
Priority: Minor


I saw this test failure:

{code}
Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 7.633 sec <<< 
FAILURE! - in 
org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest
testAccumulatorAllStatistics(org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest)
  Time elapsed: 1.5 sec  <<< FAILURE!
java.lang.AssertionError: The total number of heavy hitters should be between 0 
and 5.
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest.testAccumulatorAllStatistics(OperatorStatsAccumulatorTest.java:151)
{code}

Full log 
[here|https://s3.amazonaws.com/archive.travis-ci.org/jobs/84469788/log.txt].

Maybe the test should set a constant seed to the {{Random}} object.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2818) The *ReduceDriver classes have incorrect javadocs

2015-10-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2818:
--

 Summary: The *ReduceDriver classes have incorrect javadocs
 Key: FLINK-2818
 URL: https://issues.apache.org/jira/browse/FLINK-2818
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Trivial


The javadocs of ReduceDriver, AllReduceDriver, AllGroupReduceDriver, 
GroupReduceDriver, GroupReduceCombineDriver, and GroupCombineChainedDriver are 
outdated and/or copy-pasted-then-not-correctly-modified, which makes 
deciphering what all these classes are more difficult.

PR coming shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2809) DataSet[Unit] doesn't work

2015-10-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2809:
--

 Summary: DataSet[Unit] doesn't work
 Key: FLINK-2809
 URL: https://issues.apache.org/jira/browse/FLINK-2809
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The following code creates a DataSet\[Unit\]:

val env = ExecutionEnvironment.createLocalEnvironment()
val a = env.fromElements(1,2,3)
val b = a.map (_ => ())
b.writeAsText("/tmp/xxx")
env.execute()

This doesn't work, because a VoidSerializer is created, which can't cope with a 
BoxedUnit. See exception below.

I'm now thinking about creating a UnitSerializer class.



org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast 
to java.lang.Void
at 
org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2806) No TypeInfo for Scala's Nothing type

2015-10-02 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2806:
--

 Summary: No TypeInfo for Scala's Nothing type
 Key: FLINK-2806
 URL: https://issues.apache.org/jira/browse/FLINK-2806
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


When writing some generic code, I encountered a situation where I needed a 
TypeInformation[Nothing]. Two problems prevent me from getting it:
1. TypeInformationGen.mkTypeInfo doesn't return a real 
TypeInformation[Nothing]. (It actually returns a casted null in that case.)
2. The line
implicit def createTypeInformation[T]: TypeInformation[T] = macro 
TypeUtils.createTypeInfo[T]
does not fire in some situations when it should, when T = Nothing. (I guess 
this is a compiler bug.)

I will open a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-09-12 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2662:
--

 Summary: CompilerException: "Bug: Plan generation for Unions 
picked a ship strategy between binary plan operators."
 Key: FLINK-2662
 URL: https://issues.apache.org/jira/browse/FLINK-2662
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: master
Reporter: Gabor Gevay


I have a Flink program which throws the exception in the jira title. Full text:


Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
Plan generation for Unions picked a ship strategy between binary plan operators.
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at 
org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
at 
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
at 
org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
at 
org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
at malom.Solver.main(Solver.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The execution plan:
http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
(I obtained this by commenting out the line that throws the exception)


The code is here:
https://github.com/ggevay/flink/tree/plan-generation-bug
The class to run is "Solver". It needs a command line argument, which is a 
directory where it would write output. (On first run, it generates some 
lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set

2015-08-19 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2548:
--

 Summary: VertexCentricIteration should avoid doing a coGroup with 
the edges and the solution set
 Key: FLINK-2548
 URL: https://issues.apache.org/jira/browse/FLINK-2548
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.9, 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay


Currently, the performance of vertex centric iteration is suboptimal in those 
iterations where the workset is small, because the complexity of one iteration 
contains the number of edges and vertices of the graph because of coGroups:

VertexCentricIteration.buildMessagingFunction does a coGroup between the edges 
and the workset, to get the neighbors to the messaging UDF. This is problematic 
from a performance point of view, because the coGroup UDF gets called on all 
the edge groups, including those that are not getting any messages.

An analogous problem is present in 
VertexCentricIteration.createResultSimpleVertex at the creation of the updates: 
a coGroup happens between the messages and the solution set, which has the 
number of vertices of the graph included in its complexity.

Both of these coGroups could be avoided by doing a join instead (with the same 
keys that the coGroup uses), and then a groupBy. The complexity of these 
operations would be dominated by the size of the workset, as opposed to the 
number of edges or vertices of the graph. The joins should have the edges and 
the solution set at the build side to achieve this complexity. (They will not 
be rebuilt at every iteration.)

I made some experiments with this, and the initial results seem promising. On 
some workloads, this achieves a 2 times speedup, because later iterations often 
have quite small worksets, and these get a huge speedup from this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2542) It should be documented that it is required from a join key to override hashCode(), when it is not a POJO

2015-08-18 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2542:
--

 Summary: It should be documented that it is required from a join 
key to override hashCode(), when it is not a POJO
 Key: FLINK-2542
 URL: https://issues.apache.org/jira/browse/FLINK-2542
 Project: Flink
  Issue Type: Bug
  Components: Gelly, Java API
Reporter: Gabor Gevay
Priority: Minor
 Fix For: 0.10, 0.9.1


If the join key is not a POJO, and does not override hashCode, then the join 
silently fails (produces empty output). I don't see this documented anywhere.

The Gelly documentation should also have this info separately, because it does 
joins internally on the vertex IDs, but the user might not know this, or might 
not look at the join documentation when using Gelly.

Here is an example code:

{noformat}
public static class ID implements Comparable {
public long foo;

//no default ctor --> not a POJO

public ID(long foo) {
this.foo = foo;
}

@Override
public int compareTo(ID o) {
return ((Long)foo).compareTo(o.foo);
}

@Override
public boolean equals(Object o0) {
if(o0 instanceof ID) {
ID o = (ID)o0;
return foo == o.foo;
} else {
return false;
}
}

@Override
public int hashCode() {
return 42;
}
}


public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

DataSet> inDegrees = env.fromElements(Tuple2.of(new 
ID(123l), 4l));
DataSet> outDegrees = env.fromElements(Tuple2.of(new 
ID(123l), 5l));

DataSet> degrees = inDegrees.join(outDegrees, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
.with(new FlatJoinFunction, Tuple2, Tuple3>() {
@Override
public void join(Tuple2 first, 
Tuple2 second, Collector> out) {
out.collect(new Tuple3(first.f0, first.f1, second.f1));
}

}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");

System.out.println("degrees count: " + degrees.count());
}
{noformat}


This prints 1, but if I comment out the hashCode, it prints 0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop

2015-08-18 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2540:
--

 Summary: LocalBufferPool.requestBuffer gets into infinite loop
 Key: FLINK-2540
 URL: https://issues.apache.org/jira/browse/FLINK-2540
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay


I'm trying to run a complicated computation that looks like this: [1].
One of the DataSource->Filter->Map chains finishes fine, but the other one 
freezes. Debugging shows that it is spinning in the while loop in 
LocalBufferPool.requestBuffer.

askToRecycle is false. Both numberOfRequestedMemorySegments and currentPoolSize 
is 128, so it never goes into that if either.

This is a stack trace: [2]

And here is the code, if you would like to run it: [3]. Unfortunately, I can't 
make it more minimal, becuase if I remove some operators, the problem 
disappears. The class to start is malom.Solver. (On first run, it calculates 
some lookuptables for a few minutes, and puts them into /tmp/movegen)

[1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt
[2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt
[3] https://github.com/ggevay/flink/tree/deadlock-malom



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-15 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2527:
--

 Summary: If a VertexUpdateFunction calls setNewVertexValue more 
than once, the MessagingFunction will only see the first value set
 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


The problem is that if setNewVertexValue is called more than once, it sends 
each new value to the out Collector, and these all end up in the workset, but 
then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only 
the first value in the state Iterable. I see three ways to resolve this:

1. Add it to the documentation that setNewVertexValue should only be called 
once, and optionally add a check for this.
2. In setNewVertexValue, do not send the newValue to the out Collector at once, 
but only record it in outVal, and send the last recorded value after 
updateVertex returns.
3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and 
MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some 
documentation addition.)

I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2447:
--

 Summary: TypeExtractor returns wrong type info when a Tuple has 
two fields of the same POJO type
 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay


Consider the following code:

DataSet d1 = env.fromElements(new FooBarPojo());
DataSet> d2 = d1.map(new 
MapFunction>() {
@Override
public Tuple2 map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2, 
PojoType>

But it prints the following instead:
Java Tuple2, 
GenericType>

Note, that this problem causes some co-groups in Gelly to crash with 
"org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other" when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the "alreadySeen" field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy
starts to process the Tuple2 type, and in line 434 it 
calls itself for the first field, which proceeds into the privateGetForClass 
case which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to "alreadySeen". Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2437) TypeExtractor.analyzePojo has some problems around the default constructor detection

2015-07-30 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2437:
--

 Summary: TypeExtractor.analyzePojo has some problems around the 
default constructor detection
 Key: FLINK-2437
 URL: https://issues.apache.org/jira/browse/FLINK-2437
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


If a class does have a default constructor, but the user forgot to make it 
public, then TypeExtractor.analyzePojo still thinks everything is OK, so it 
creates a PojoTypeInfo. Then PojoSerializer.createInstance blows up.

Furthermore, a "return null" seems to be missing from the then case of the if 
after catching the NoSuchMethodException which would also cause a headache for 
PojoSerializer.

An additional minor issue is that the word "class" is printed twice in several 
places, because class.toString also prepends it to the class name.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2378) complexIntegrationTest1 and testGroupByFeedback sometimes fail on Windows

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2378:
--

 Summary: complexIntegrationTest1 and testGroupByFeedback sometimes 
fail on Windows
 Key: FLINK-2378
 URL: https://issues.apache.org/jira/browse/FLINK-2378
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gabor Gevay
Priority: Minor


This only happens when Maven runs the test, I can't trigger it from the IDE.

At first I thought that something is wrong with having shared variables between 
the tests in ComplexIntegrationTest, but when I eliminated the shared 
variables, the test still fails.

The error msgs:

testGroupByFeedback(org.apache.flink.streaming.api.IterateTest)
Time elapsed: 12.091 sec  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:169)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.AssertionError: null
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertTrue(Assert.java:52)
at 
org.apache.flink.streaming.api.IterateTest$6.close(IterateTest.java:447)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeOperator(StreamTask.java:182)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:112)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.lang.Thread.run(Thread.java:745)



complexIntegrationTest1(org.apache.flink.streaming.api.complex.ComplexIntegrationTest)
 Time elapsed: 15.989 sec  <<< FAILURE!
java.lang.AssertionError: Different number of lines in expected and
obtained result. expected:<9> but was:<5>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:272)
at 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:258)
at 
org.apache.flink.streaming.api.complex.ComplexIntegrationTest.after(ComplexIntegrationTest.java:91)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2377:
--

 Summary: AbstractTestBase.deleteAllTempFiles sometimes fails on 
Windows
 Key: FLINK-2377
 URL: https://issues.apache.org/jira/browse/FLINK-2377
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows
Reporter: Gabor Gevay
Priority: Minor


This is probably another file closing issue. (that is, Windows won't delete 
open files, as opposed to Linux)

I have encountered two concrete tests so far where this actually appears: 
CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2376) testFindConnectableAddress sometimes fails on Windows because of the time limit

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2376:
--

 Summary: testFindConnectableAddress sometimes fails on Windows 
because of the time limit
 Key: FLINK-2376
 URL: https://issues.apache.org/jira/browse/FLINK-2376
 Project: Flink
  Issue Type: Bug
 Environment: Windows
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail

2015-07-17 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2374:
--

 Summary: File.setWritable doesn't work on Windows, which makes 
several tests fail
 Key: FLINK-2374
 URL: https://issues.apache.org/jira/browse/FLINK-2374
 Project: Flink
  Issue Type: Bug
 Environment: Windows
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Trivial


The tests that use setWritable to test the handling of certain error conditions 
should be skipped in case we are under Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2369) On Windows, in testFailingSortingDataSinkTask the temp file is not removed

2015-07-16 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2369:
--

 Summary: On Windows, in testFailingSortingDataSinkTask the temp 
file is not removed
 Key: FLINK-2369
 URL: https://issues.apache.org/jira/browse/FLINK-2369
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
 Environment: Windows 7 64-bit, JDK 8u51
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The test fails with the assert at the very end ("Temp output file has not been 
removed"). This happens because FileOutputFormat.tryCleanupOnError can't delete 
the file, because it is still open (note, that Linux happily deletes open 
files).

A fix would be to have the  this.format.close();  not just in the finally block 
(in DataSinkTask.invoke), but also before the tryCleanupOnError call (around 
line 217).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2359) Add factory methods to the Java TupleX types

2015-07-14 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2359:
--

 Summary: Add factory methods to the Java TupleX types
 Key: FLINK-2359
 URL: https://issues.apache.org/jira/browse/FLINK-2359
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The compiler doesn't infer generic type arguments from constructor arguments, 
which means that we have to call Tuple constructors like this:

Tuple2 = new Tuple2(5, "foo");

I propose adding a factory method, which would provide the following 
alternative:

Tuple2 = Tuple2.create(5, "foo");

(Note that C++ and C# Tuples also have similar factory methods for the same 
reason.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2255) In the TopSpeedWindowing examples, every window contains only 1 element, because event time is in millisec, but eviction is in sec

2015-06-21 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2255:
--

 Summary: In the TopSpeedWindowing examples, every window contains 
only 1 element, because event time is in millisec, but eviction is in sec
 Key: FLINK-2255
 URL: https://issues.apache.org/jira/browse/FLINK-2255
 Project: Flink
  Issue Type: Bug
  Components: Examples, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The event times are generated by System.currentTimeMillis(), so evictionSec 
should be multiplied by 1000, when passing it to Time.of.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2181) SessionWindowing example has a memleak

2015-06-08 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2181:
--

 Summary: SessionWindowing example has a memleak
 Key: FLINK-2181
 URL: https://issues.apache.org/jira/browse/FLINK-2181
 Project: Flink
  Issue Type: Bug
  Components: Examples, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay


The trigger policy objects belonging to already terminated sessions are kept in 
memory, and also NotifyOnLastGlobalElement gets called on them. This causes the 
program to eat up more and more memory, and also to get slower with time.

Mailing list discussion:
http://mail-archives.apache.org/mod_mbox/flink-dev/201505.mbox/%3CCADXjeyBW9uaA=ayde+9r+qh85pblyavuegsr7h8pwkyntm9...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2148) Approximately calculate the number of distinct elements of a stream

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2148:
--

 Summary: Approximately calculate the number of distinct elements 
of a stream
 Key: FLINK-2148
 URL: https://issues.apache.org/jira/browse/FLINK-2148
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Priority: Minor


In the paper
http://people.seas.harvard.edu/~minilek/papers/f0.pdf
Kane et al. describes an optimal algorithm for estimating the number of 
distinct elements in a data stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2147) Approximate calculation of frequencies in data streams

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2147:
--

 Summary: Approximate calculation of frequencies in data streams
 Key: FLINK-2147
 URL: https://issues.apache.org/jira/browse/FLINK-2147
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Priority: Minor


Count-Min sketch is a hashing-based algorithm for approximately keeping track 
of the frequencies of elements in a data stream. It is described by Cormode et 
al. in the following paper:
http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
Note that this algorithm can be conveniently implemented in a distributed way, 
as described in section 3.2 of the paper.

The paper
http://www.vldb.org/conf/2002/S10P03.pdf
also describes algorithms for approximately keeping track of frequencies, but 
here the user can specify a threshold below which she is not interested in the 
frequency of an element. The error-bounds are also different than the Count-min 
sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2146) Fast calculation of min/max with arbitrary eviction and triggers

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2146:
--

 Summary: Fast calculation of min/max with arbitrary eviction and 
triggers
 Key: FLINK-2146
 URL: https://issues.apache.org/jira/browse/FLINK-2146
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Priority: Minor


The last algorithm described here could be used:
http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
It is based on a double-ended queue which maintains a sorted list of elements 
of the current window that have the possibility of being the maximal element in 
the future.

Store: O(1) amortized
Evict: O(1)
emitWindow: O(1)
memory: O(N)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2145) Median calculation for windows

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2145:
--

 Summary: Median calculation for windows
 Key: FLINK-2145
 URL: https://issues.apache.org/jira/browse/FLINK-2145
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The PreReducer for this has the following algorithm: We maintain two multisets 
(as, for example, balanced binary search trees), that always partition the 
elements of the current window to smaller-than-median and larger-than-median 
elements. At each store and evict, we can maintain this invariant with only 
O(1) multiset operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2144) Implement count, average, and variance for windows

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2144:
--

 Summary: Implement count, average, and variance for windows
 Key: FLINK-2144
 URL: https://issues.apache.org/jira/browse/FLINK-2144
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


By count I mean the number of elements in the window.

These can be implemented very efficiently building on FLINK-2143.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2143:
--

 Summary: Add an overload to reduceWindow which takes the inverse 
of the reduceFunction as a second parameter
 Key: FLINK-2143
 URL: https://issues.apache.org/jira/browse/FLINK-2143
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Gevay
Assignee: Gabor Gevay


If the inverse of the reduceFunction is also available (for example subtraction 
when summing numbers), then a PreReducer can maintain the aggregate in O(1) 
memory and O(1) time for evict, store, and emitWindow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2142) GSoC project: Exact and Approximate Statistics for Data Streams and Windows

2015-06-03 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2142:
--

 Summary: GSoC project: Exact and Approximate Statistics for Data 
Streams and Windows
 Key: FLINK-2142
 URL: https://issues.apache.org/jira/browse/FLINK-2142
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The goal of this project is to implement basic statistics of data streams and 
windows (like average, median, variance, correlation, etc.) in a 
computationally efficient manner. This involves designing custom preaggregators.

The exact calculation of some statistics (eg. frequencies, or the number of 
distinct elements) would require memory proportional to the number of elements 
in the input (the window or the entire stream). However, there are efficient 
algorithms and data structures using less memory for calculating the same 
statistics only approximately, with user-specified error bounds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2121) FileInputFormat.addFilesInDir miscalculates total size

2015-05-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2121:
--

 Summary: FileInputFormat.addFilesInDir miscalculates total size
 Key: FLINK-2121
 URL: https://issues.apache.org/jira/browse/FLINK-2121
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


In FileInputFormat.addFilesInDir, the length variable should start from 0, 
because the return value is always used by adding it to the length (instead of 
just assigning). So with the current version, the length before the call will 
be seen twice in the result.

mvn verify caught this for me now. The reason why this hasn't been seen yet, is 
because testGetStatisticsMultipleNestedFiles catches this only if it gets the 
listings of the outer directory in a certain order. Concretely, if the inner 
directory is seen before the other file in the outer directory, then length is 
0 at that point, so the bug doesn't show. But if the other file is seen first, 
then its size is added twice to the total result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2114) PunctuationPolicy.toString() throws NullPointerException if extractor is null

2015-05-29 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2114:
--

 Summary: PunctuationPolicy.toString() throws NullPointerException 
if extractor is null
 Key: FLINK-2114
 URL: https://issues.apache.org/jira/browse/FLINK-2114
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


Parenthesis is missing in PunctuationPolicy.toString() around the conditional 
operator checking for not null, which makes the condition always true.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)