[jira] [Created] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
Gabor Gevay created FLINK-8649: -- Summary: Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo Key: FLINK-8649 URL: https://issues.apache.org/jira/browse/FLINK-8649 Project: Flink Issue Type: Bug Components: Scala API Affects Versions: 1.4.0 Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 1.5.0 This is {{StreamExecutionEnvironment.createInput}} in the Scala API: {code} def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] = asScalaStream(javaEnv.createInput(inputFormat)) {code} It should pass on the implicitly got {{TypeInformation}} to Java like this: {code} def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] = asScalaStream(javaEnv.createInput(inputFormat, implicitly[TypeInformation[T]])) {code} The current situation creates a problem, for example, when we have generics in the type like in the following code, where the Java API can't deduce the {{TypeInformation}} on its own: {code} StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner
Gabor Gevay created FLINK-8117: -- Summary: Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner Key: FLINK-8117 URL: https://issues.apache.org/jira/browse/FLINK-8117 Project: Flink Issue Type: Improvement Components: Local Runtime, Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Fix For: 1.5.0 Both {{RoundRobinChannelSelector}} and {{RebalancePartitioner}} use a modulo operation to wrap around when the current channel counter reaches the number of channels. Using an {{if}} would have better performance. A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but the {{if}} will be only 1-2 cycles on average, since the branch predictor can most of the time predict the condition to be false. \[1\] http://www.agner.org/optimize/instruction_tables.pdf -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8116) Stale comments referring to Checkpointed interface
Gabor Gevay created FLINK-8116: -- Summary: Stale comments referring to Checkpointed interface Key: FLINK-8116 URL: https://issues.apache.org/jira/browse/FLINK-8116 Project: Flink Issue Type: Bug Components: DataStream API, Documentation Reporter: Gabor Gevay Priority: Trivial Fix For: 1.4.0, 1.5.0 Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by the {{CheckpointedFunction}} interface. However, in {{SourceFunction}} there are two comments still referring to the old {{Checkpointed}} interface. (The code examples there also need to be modified.) Note that the problem also occurs in {{StreamExecutionEnvironment}}, and possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7702) Javadocs link broken
Gabor Gevay created FLINK-7702: -- Summary: Javadocs link broken Key: FLINK-7702 URL: https://issues.apache.org/jira/browse/FLINK-7702 Project: Flink Issue Type: Bug Components: Documentation Reporter: Gabor Gevay Priority: Minor The "Javadocs" link in the left side menu of this page doesn't work: https://ci.apache.org/projects/flink/flink-docs-master/ Note that it works in 1.3: https://ci.apache.org/projects/flink/flink-docs-release-1.3/ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7685) CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly"
Gabor Gevay created FLINK-7685: -- Summary: CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly" Key: FLINK-7685 URL: https://issues.apache.org/jira/browse/FLINK-7685 Project: Flink Issue Type: Bug Components: Optimizer Reporter: Gabor Gevay Priority: Minor A Flink program which reads an input DataSet, creates 64 new DataSets from it, and writes these to separate files throws the following exception: {code:java} Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly. at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:491) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:921) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86) {code} Here is some code that reproduces it: https://github.com/ggevay/flink/tree/compiler-exception-new Note that it works with less than 64 DataSets. Also note that with more than 64 DataSets it throws {{CompilerException: Cannot currently handle nodes with more than 64 outputs}}, which is at least a clear error msg that helps the user to find a workaround. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7680) Add "Performance Tuning" section to docs
Gabor Gevay created FLINK-7680: -- Summary: Add "Performance Tuning" section to docs Key: FLINK-7680 URL: https://issues.apache.org/jira/browse/FLINK-7680 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Gabor Gevay Priority: Minor Fix For: 1.4.0 We could have a separate section in the docs about performance tuning (maybe separately for batch and streaming jobs). It could include for example: - object reuse - serializer issues - semantic annotations - optimizer hints - sorter code generation (Flink-5734) See [~fhueske]'s suggestion here: https://github.com/apache/flink/pull/3511#discussion_r139917275 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7629) Scala stream aggregations should support nested field expressions
Gabor Gevay created FLINK-7629: -- Summary: Scala stream aggregations should support nested field expressions Key: FLINK-7629 URL: https://issues.apache.org/jira/browse/FLINK-7629 Project: Flink Issue Type: Bug Components: Scala API, Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Fix For: 1.4.0 In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only work with a field name, and not with nested field expressions, such as "fieldA.fieldX". (This contradicts their documentation.) The reason for this is that the string overload of {{KeyedStream.aggregate}} uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as the integer overload does (and as the Java API does). The ctors of {{SumAggregator}} or {{ComparableAggregator}} will call {{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested field expression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps
Gabor Gevay created FLINK-4868: -- Summary: Insertion sort could avoid the swaps Key: FLINK-4868 URL: https://issues.apache.org/jira/browse/FLINK-4868 Project: Flink Issue Type: Sub-task Components: Local Runtime Reporter: Gabor Gevay Priority: Minor This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance
Gabor Gevay created FLINK-4867: -- Summary: Investigate code generation for improving sort performance Key: FLINK-4867 URL: https://issues.apache.org/jira/browse/FLINK-4867 Project: Flink Issue Type: Sub-task Components: Local Runtime Reporter: Gabor Gevay Priority: Minor This issue is for investigating whether code generation could speed up sorting. We should make some performance measurements on hand-written code that is similar to what we could generate, to see whether investing more time into this is worth it. If we find that it is worth it, we can open a second Jira for the actual implementation of the code generation. I think we could generate one class at places where we currently instantiate {{QuickSort}}. This generated class would include the functionality of {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, {{MemorySegment.compare}}, and {{MemorySegment.swap}}. Btw. I'm planning to give this as a student project at a TU Berlin course in the next few months. Some concrete ideas about how could a generated sorter be faster than the current sorting code: * {{MemorySegment.compare}} could be specialized for ** Length: for small records, the loop could be unrolled ** Endiannes (currently it is optimized for big endian; and in the little endian case (e.g. x86) it does a Long.reverseBytes for each long read) * {{MemorySegment.swapBytes}} ** In case of small records, using three {{UNSAFE.copyMemory}} is probably not as efficient as a specialized swap, because *** We could use total loop unrolling in generated code (because we know the exact record size) *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] *** We will only need 2/3 the memory bandwidth, because the temporary storage could be a register if we swap one byte (or one {{long}}) at a time ** several checks might be eliminated * Better inlining behaviour could be achieved ** Virtual function calls to the methods of {{InMemorySorter}} could be eliminated. (Note, that these are problematic to devirtualize by the JVM if there are different derived classes used in a single Flink job (see \[8,7\]).) ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the excessive checks make it too large ** {{MemorySegment.compare}} is probably also not inlined currently, because those two while loops are too large \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, long, Object, long, long) \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ \[8\] http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ \[9\] http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4578) AggregateOperator incorrectly sets ForwardedField with nested composite types
Gabor Gevay created FLINK-4578: -- Summary: AggregateOperator incorrectly sets ForwardedField with nested composite types Key: FLINK-4578 URL: https://issues.apache.org/jira/browse/FLINK-4578 Project: Flink Issue Type: Bug Components: DataSet API Reporter: Gabor Gevay When an aggregation is called on a grouped DataSet, {{AggregateOperator.translateToDataFlow}} tries to determine whether the field that is being aggregated is the same field that the grouping is based on. If this is not the case, then it adds the ForwardedField property for the key field. However, the mechanism that makes this decision breaks when there are nested composite types involved, because it gets the key positions with {{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ positions, whereas the position of the field to aggregate is counted only on the outer type. Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields Here, I have changed the WordCount example to have the type {{Tuple3, String, Integer>}}, and do {{.groupBy(1).sum(2)}} (which groups by the String field and sums the Integer field). If you set a breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that {{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, but only 1 field in {{fields}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4575) DataSet aggregate methods should support POJOs
Gabor Gevay created FLINK-4575: -- Summary: DataSet aggregate methods should support POJOs Key: FLINK-4575 URL: https://issues.apache.org/jira/browse/FLINK-4575 Project: Flink Issue Type: Improvement Components: DataSet API Reporter: Gabor Gevay Priority: Minor The aggregate methods of DataSets (aggregate, sum, min, max) currently only support Tuples, with the fields specified by indices. With https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for POJOs and field expressions would be easy: {{AggregateOperator}} would create {{FieldAccessors}} instead of just storing field positions, and {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} instead of the Tuple field access methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
Gabor Gevay created FLINK-3999: -- Summary: Rename the `running` flag in the drivers to `canceled` Key: FLINK-3999 URL: https://issues.apache.org/jira/browse/FLINK-3999 Project: Flink Issue Type: Bug Components: Local Runtime Reporter: Gabor Gevay Priority: Trivial The name of the {{running}} flag in the drivers doesn't reflect its usage: when the operator just stops normally, then it is not running anymore, but the {{running}} flag will still be true, since the {{running}} flag is only set when cancelling. It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
Gabor Gevay created FLINK-3722: -- Summary: The divisions in the InMemorySorters' swap/compare methods hurt performance Key: FLINK-3722 URL: https://issues.apache.org/jira/browse/FLINK-3722 Project: Flink Issue Type: Improvement Components: Distributed Runtime Reporter: Gabor Gevay Priority: Minor NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods use divisions (which take a lot of time \[1\]) to calculate the index of the MemorySegment and the offset inside the segment. [~greghogan] reported on the mailing list \[2\] measuring a ~12-14% performance effect in one case. A possibility to improve the situation is the following: The way that QuickSort mostly uses these compare and swap methods is that it maintains two indices, and uses them to call compare and swap. The key observation is that these indices are mostly stepped by one, and _incrementally_ calculating the quotient and modulo is actually easy when the index changes only by one: increment/decrement the modulo, and check whether the modulo has reached 0 or the divisor, and if it did, then wrap-around the modulo and increment/decrement the quotient. To implement this, InMemorySorter would have to expose an iterator that would have the divisor and the current modulo and quotient as state, and have increment/decrement methods. Compare and swap could then have overloads that take these iterators as arguments. \[1\] http://www.agner.org/optimize/instruction_tables.pdf \[2\] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant
Gabor Gevay created FLINK-3519: -- Summary: Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant Key: FLINK-3519 URL: https://issues.apache.org/jira/browse/FLINK-3519 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Gabor Gevay Priority: Minor If I have a subclass of TupleN, then objects of this type will turn into TupleNs when I try to use them in a DataSet. For example, if I have a class like this: {code} public static class Foo extends Tuple1 { public short a; public Foo() {} public Foo(int f0, int a) { this.f0 = f0; this.a = (short)a; } @Override public String toString() { return "(" + f0 + ", " + a + ")"; } } {code} And then I do this: {code} env.fromElements(0,0,0).map(new MapFunction>() { @Override public Tuple1 map(Integer value) throws Exception { return new Foo(5, 6); } }).print(); {code} Then I don't have Foos in the output, but only Tuples: {code} (5) (5) (5) {code} The problem is caused by the TupleSerializer not caring about subclasses at all. I guess the reason for this is performance: we don't want to deal with writing and reading subclass tags when we have Tuples. I see three options for solving this: 1. Add subclass tags to the TupleSerializer: This is not really an option, because we don't want to loose performance. 2. Document this behavior in the javadoc of the Tuple classes. 3. Make the Tuple types final: this would be the clean solution, but it is API breaking, and the first victim would be Gelly: the Vertex and Edge types extend from tuples. (Note that the issue doesn't appear there, because the DataSets there always have the type of the descendant class.) When deciding between 2. and 3., an important point to note is that if you have your class extend from a Tuple type instead of just adding the f0, f1, ... fields manually in the hopes of getting the performance boost associated with Tuples, then you are out of luck: the PojoSerializer will kick in anyway when the declared types of your DataSets are the descendant type. If someone knows about a good reason to extend from a Tuple class, then please comment. For 2., this is a suggested wording for the javadoc of the Tuple classes: Warning: Please don't subclass Tuple classes, but if you do, then be sure to always declare the element type of your DataSets to your descendant type. (That is, if you have a "class A extends Tuple2", then don't use instances of A in a DataSet, but use DataSet.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)
Gabor Gevay created FLINK-3394: -- Summary: Clear up the contract of MutableObjectIterator.next(reuse) Key: FLINK-3394 URL: https://issues.apache.org/jira/browse/FLINK-3394 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Gabor Gevay Priority: Critical {{MutableObjectIterator.next(reuse)}} has the following contract (according to [~StephanEwen]'s comment \[1\]): 1. The caller may not hold onto {{reuse}} any more 2. The iterator implementor may not hold onto the returned object any more. This should be documented in its javadoc (with "WARNING" so that people don't overlook it). Additionally, since this was a "secret contract" up to now, all the 270 usages of {{MutableObjectIterator.next(reuse)}} should be checked for violations. A few that are suspicious at first glance, are in {{CrossDriver}}, {{UnionWithTempOperator}}, {{MutableHashTable.ProbeIterator.next}}, {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. \[1\] https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3333) Documentation about object reuse should be improved
Gabor Gevay created FLINK-: -- Summary: Documentation about object reuse should be improved Key: FLINK- URL: https://issues.apache.org/jira/browse/FLINK- Project: Flink Issue Type: Bug Affects Versions: 1.0.0 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Fix For: 1.0.0 The documentation about object reuse \[1\] has several problems, see \[2\]. \[1\] https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior \[2\] https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
Gabor Gevay created FLINK-3322: -- Summary: MemoryManager creates too much GC pressure with iterative jobs Key: FLINK-3322 URL: https://issues.apache.org/jira/browse/FLINK-3322 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Gabor Gevay When taskmanager.memory.preallocate is false (the default), released memory segments are not added to a pool, but the GC is expected to take care of them. This puts too much pressure on the GC with iterative jobs, where the operators reallocate all memory at every superstep. See the following discussion on the mailing list: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html Reproducing the issue: https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc The class to start is malom.Solver. If you increase the memory given to the JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. (It will generate some lookuptables to /tmp on first run for a few minutes.) (I think the slowdown might also depend somewhat on taskmanager.memory.fraction, because more unused non-managed memory results in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3321) TupleSerializerBase.getLength should know the length when all fields know it
Gabor Gevay created FLINK-3321: -- Summary: TupleSerializerBase.getLength should know the length when all fields know it Key: FLINK-3321 URL: https://issues.apache.org/jira/browse/FLINK-3321 Project: Flink Issue Type: Improvement Components: Java API Reporter: Gabor Gevay Priority: Minor TupleSerializerBase.getLength currently always returns -1, but it could actually know the length, when all the field serializers know their lengths (since no null can appear anywhere in Tuples, nor can a subclass of Tuple with additional fields appear). (The serializer knowing the exact size has various performance benefits, for example see FixedLengthRecordSorter, or CompactingHashTable.getInitialTableSize.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead
Gabor Gevay created FLINK-3291: -- Summary: Object reuse bug in MergeIterator.HeadStream.nextHead Key: FLINK-3291 URL: https://issues.apache.org/jira/browse/FLINK-3291 Project: Flink Issue Type: Bug Affects Versions: 1.0.0 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Critical MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the `reuse` object that it got as an argument. This object might be modified later by the caller. This actually happens when ReduceDriver.run calls input.next (which will actually be MergeIterator.next(E reuse)) in the inner while loop of the objectReuseEnabled branch, and that calls top.nextHead with the reference that it got from ReduceDriver, which erroneously saves the reference, and then ReduceDriver later uses that same object for doing the reduce. Another way in which this fails is when MergeIterator.next(E reuse) gives `reuse` to different `top`s in different calls, and then the heads end up being the same object. You can observe the latter situation in action by running ReducePerformance here: https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug Set memory to -Xmx200m (so that the MergeIterator actually has merging to do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then watch `reuse`, and the heads of the first two elements of `this.heap` in the debugger. They will get to be the same object after hitting continue about 6 times. You can also look at the count that is printed at the end, which shouldn't be larger than the key range. Also, if you look into the output file /tmp/xxxobjectreusebug, for example the key 77 appears twice. The good news is that I think I can see an easy fix that doesn't affect performance: MergeIterator.HeadStream could have a reuse object of its own as a member, and give that to iterator.next in nextHead(E reuse). And then we wouldn't need the overload of nextHead that has the reuse parameter, and MergeIterator.next(E reuse) could just call its other overload. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3266) LocalFlinkMiniCluster leaks resources when multiple jobs are submitted
Gabor Gevay created FLINK-3266: -- Summary: LocalFlinkMiniCluster leaks resources when multiple jobs are submitted Key: FLINK-3266 URL: https://issues.apache.org/jira/browse/FLINK-3266 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.0.0 Reporter: Gabor Gevay Priority: Minor After a job submitted to a LocalEnvironment finishes, some threads are not stopped, and are stuck in waiting forever. You can observe this, if you enclose the body of the main function of the WordCount example with a loop that executes 100 times, and monitor the thread count (with VisualVM for example). (The problem only happens if I use a mini cluster. If I use start-local.sh and submit jobs to it, then there is no leak.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3166) The first program in ObjectReuseITCase has the wrong expected result, and it succeeds
Gabor Gevay created FLINK-3166: -- Summary: The first program in ObjectReuseITCase has the wrong expected result, and it succeeds Key: FLINK-3166 URL: https://issues.apache.org/jira/browse/FLINK-3166 Project: Flink Issue Type: Bug Reporter: Gabor Gevay Priority: Critical The first program in ObjectReuseITCase has the following input: a,1 a,2 a,3 a,4 a,50 There is a groupBy on field 0, and then a reduce, so the result should be 1+2+3+4+50 = 60. But the hardcoded expected result is 100, and running the Flink program also produces this. The problem is caused my mismatched assumptions between ReduceCombineDriver.sortAndCombine and the ReduceFunction in the test about object reuse rules of ReduceFunctions: ReduceCombineDriver.sortAndCombine has the following comment: "The user function is expected to return the first input as the result." While the ReduceFunction in the test is modifying and returning the second input. (And the second program in the test also has the same problem.) I can't find the assumption that is stated in the comment in any documentation. For example, the javadoc of ReduceFunction should make the user aware of this. Or, alternatively, the code of the driver should be modified to not make this assumption. I'm not sure which solution is better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2839) Failing test: OperatorStatsAccumulatorTest.testAccumulatorAllStatistics
Gabor Gevay created FLINK-2839: -- Summary: Failing test: OperatorStatsAccumulatorTest.testAccumulatorAllStatistics Key: FLINK-2839 URL: https://issues.apache.org/jira/browse/FLINK-2839 Project: Flink Issue Type: Bug Components: flink-contrib Reporter: Gabor Gevay Priority: Minor I saw this test failure: {code} Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 7.633 sec <<< FAILURE! - in org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest testAccumulatorAllStatistics(org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest) Time elapsed: 1.5 sec <<< FAILURE! java.lang.AssertionError: The total number of heavy hitters should be between 0 and 5. at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.apache.flink.contrib.operatorstatistics.OperatorStatsAccumulatorTest.testAccumulatorAllStatistics(OperatorStatsAccumulatorTest.java:151) {code} Full log [here|https://s3.amazonaws.com/archive.travis-ci.org/jobs/84469788/log.txt]. Maybe the test should set a constant seed to the {{Random}} object. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2818) The *ReduceDriver classes have incorrect javadocs
Gabor Gevay created FLINK-2818: -- Summary: The *ReduceDriver classes have incorrect javadocs Key: FLINK-2818 URL: https://issues.apache.org/jira/browse/FLINK-2818 Project: Flink Issue Type: Bug Components: Distributed Runtime Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Trivial The javadocs of ReduceDriver, AllReduceDriver, AllGroupReduceDriver, GroupReduceDriver, GroupReduceCombineDriver, and GroupCombineChainedDriver are outdated and/or copy-pasted-then-not-correctly-modified, which makes deciphering what all these classes are more difficult. PR coming shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2809) DataSet[Unit] doesn't work
Gabor Gevay created FLINK-2809: -- Summary: DataSet[Unit] doesn't work Key: FLINK-2809 URL: https://issues.apache.org/jira/browse/FLINK-2809 Project: Flink Issue Type: Bug Components: Scala API Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The following code creates a DataSet\[Unit\]: val env = ExecutionEnvironment.createLocalEnvironment() val a = env.fromElements(1,2,3) val b = a.map (_ => ()) b.writeAsText("/tmp/xxx") env.execute() This doesn't work, because a VoidSerializer is created, which can't cope with a BoxedUnit. See exception below. I'm now thinking about creating a UnitSerializer class. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to java.lang.Void at org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2806) No TypeInfo for Scala's Nothing type
Gabor Gevay created FLINK-2806: -- Summary: No TypeInfo for Scala's Nothing type Key: FLINK-2806 URL: https://issues.apache.org/jira/browse/FLINK-2806 Project: Flink Issue Type: Bug Components: Scala API Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor When writing some generic code, I encountered a situation where I needed a TypeInformation[Nothing]. Two problems prevent me from getting it: 1. TypeInformationGen.mkTypeInfo doesn't return a real TypeInformation[Nothing]. (It actually returns a casted null in that case.) 2. The line implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] does not fire in some situations when it should, when T = Nothing. (I guess this is a compiler bug.) I will open a PR shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
Gabor Gevay created FLINK-2662: -- Summary: CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators." Key: FLINK-2662 URL: https://issues.apache.org/jira/browse/FLINK-2662 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: master Reporter: Gabor Gevay I have a Flink program which throws the exception in the jira title. Full text: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators. at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) at org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) at org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) at malom.Solver.main(Solver.java:66) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) The execution plan: http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt (I obtained this by commenting out the line that throws the exception) The code is here: https://github.com/ggevay/flink/tree/plan-generation-bug The class to run is "Solver". It needs a command line argument, which is a directory where it would write output. (On first run, it generates some lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2548) VertexCentricIteration should avoid doing a coGroup with the edges and the solution set
Gabor Gevay created FLINK-2548: -- Summary: VertexCentricIteration should avoid doing a coGroup with the edges and the solution set Key: FLINK-2548 URL: https://issues.apache.org/jira/browse/FLINK-2548 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9, 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Currently, the performance of vertex centric iteration is suboptimal in those iterations where the workset is small, because the complexity of one iteration contains the number of edges and vertices of the graph because of coGroups: VertexCentricIteration.buildMessagingFunction does a coGroup between the edges and the workset, to get the neighbors to the messaging UDF. This is problematic from a performance point of view, because the coGroup UDF gets called on all the edge groups, including those that are not getting any messages. An analogous problem is present in VertexCentricIteration.createResultSimpleVertex at the creation of the updates: a coGroup happens between the messages and the solution set, which has the number of vertices of the graph included in its complexity. Both of these coGroups could be avoided by doing a join instead (with the same keys that the coGroup uses), and then a groupBy. The complexity of these operations would be dominated by the size of the workset, as opposed to the number of edges or vertices of the graph. The joins should have the edges and the solution set at the build side to achieve this complexity. (They will not be rebuilt at every iteration.) I made some experiments with this, and the initial results seem promising. On some workloads, this achieves a 2 times speedup, because later iterations often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2542) It should be documented that it is required from a join key to override hashCode(), when it is not a POJO
Gabor Gevay created FLINK-2542: -- Summary: It should be documented that it is required from a join key to override hashCode(), when it is not a POJO Key: FLINK-2542 URL: https://issues.apache.org/jira/browse/FLINK-2542 Project: Flink Issue Type: Bug Components: Gelly, Java API Reporter: Gabor Gevay Priority: Minor Fix For: 0.10, 0.9.1 If the join key is not a POJO, and does not override hashCode, then the join silently fails (produces empty output). I don't see this documented anywhere. The Gelly documentation should also have this info separately, because it does joins internally on the vertex IDs, but the user might not know this, or might not look at the join documentation when using Gelly. Here is an example code: {noformat} public static class ID implements Comparable { public long foo; //no default ctor --> not a POJO public ID(long foo) { this.foo = foo; } @Override public int compareTo(ID o) { return ((Long)foo).compareTo(o.foo); } @Override public boolean equals(Object o0) { if(o0 instanceof ID) { ID o = (ID)o0; return foo == o.foo; } else { return false; } } @Override public int hashCode() { return 42; } } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> inDegrees = env.fromElements(Tuple2.of(new ID(123l), 4l)); DataSet> outDegrees = env.fromElements(Tuple2.of(new ID(123l), 5l)); DataSet> degrees = inDegrees.join(outDegrees, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) .with(new FlatJoinFunction, Tuple2, Tuple3>() { @Override public void join(Tuple2 first, Tuple2 second, Collector> out) { out.collect(new Tuple3(first.f0, first.f1, second.f1)); } }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1"); System.out.println("degrees count: " + degrees.count()); } {noformat} This prints 1, but if I comment out the hashCode, it prints 0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop
Gabor Gevay created FLINK-2540: -- Summary: LocalBufferPool.requestBuffer gets into infinite loop Key: FLINK-2540 URL: https://issues.apache.org/jira/browse/FLINK-2540 Project: Flink Issue Type: Bug Reporter: Gabor Gevay I'm trying to run a complicated computation that looks like this: [1]. One of the DataSource->Filter->Map chains finishes fine, but the other one freezes. Debugging shows that it is spinning in the while loop in LocalBufferPool.requestBuffer. askToRecycle is false. Both numberOfRequestedMemorySegments and currentPoolSize is 128, so it never goes into that if either. This is a stack trace: [2] And here is the code, if you would like to run it: [3]. Unfortunately, I can't make it more minimal, becuase if I remove some operators, the problem disappears. The class to start is malom.Solver. (On first run, it calculates some lookuptables for a few minutes, and puts them into /tmp/movegen) [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt [3] https://github.com/ggevay/flink/tree/deadlock-malom -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
Gabor Gevay created FLINK-2527: -- Summary: If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set Key: FLINK-2527 URL: https://issues.apache.org/jira/browse/FLINK-2527 Project: Flink Issue Type: Bug Components: Gelly Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 0.10, 0.9.1 The problem is that if setNewVertexValue is called more than once, it sends each new value to the out Collector, and these all end up in the workset, but then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only the first value in the state Iterable. I see three ways to resolve this: 1. Add it to the documentation that setNewVertexValue should only be called once, and optionally add a check for this. 2. In setNewVertexValue, do not send the newValue to the out Collector at once, but only record it in outVal, and send the last recorded value after updateVertex returns. 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some documentation addition.) I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
Gabor Gevay created FLINK-2447: -- Summary: TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Reporter: Gabor Gevay Consider the following code: DataSet d1 = env.fromElements(new FooBarPojo()); DataSet> d2 = d1.map(new MapFunction>() { @Override public Tuple2 map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2, PojoType> But it prints the following instead: Java Tuple2, GenericType> Note, that this problem causes some co-groups in Gelly to crash with "org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other" when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the "alreadySeen" field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2 type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to "alreadySeen". Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2437) TypeExtractor.analyzePojo has some problems around the default constructor detection
Gabor Gevay created FLINK-2437: -- Summary: TypeExtractor.analyzePojo has some problems around the default constructor detection Key: FLINK-2437 URL: https://issues.apache.org/jira/browse/FLINK-2437 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor If a class does have a default constructor, but the user forgot to make it public, then TypeExtractor.analyzePojo still thinks everything is OK, so it creates a PojoTypeInfo. Then PojoSerializer.createInstance blows up. Furthermore, a "return null" seems to be missing from the then case of the if after catching the NoSuchMethodException which would also cause a headache for PojoSerializer. An additional minor issue is that the word "class" is printed twice in several places, because class.toString also prepends it to the class name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2378) complexIntegrationTest1 and testGroupByFeedback sometimes fail on Windows
Gabor Gevay created FLINK-2378: -- Summary: complexIntegrationTest1 and testGroupByFeedback sometimes fail on Windows Key: FLINK-2378 URL: https://issues.apache.org/jira/browse/FLINK-2378 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gabor Gevay Priority: Minor This only happens when Maven runs the test, I can't trigger it from the IDE. At first I thought that something is wrong with having shared variables between the tests in ComplexIntegrationTest, but when I eliminated the shared variables, the test still fails. The error msgs: testGroupByFeedback(org.apache.flink.streaming.api.IterateTest) Time elapsed: 12.091 sec <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:169) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertTrue(Assert.java:52) at org.apache.flink.streaming.api.IterateTest$6.close(IterateTest.java:447) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeOperator(StreamTask.java:182) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:112) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.lang.Thread.run(Thread.java:745) complexIntegrationTest1(org.apache.flink.streaming.api.complex.ComplexIntegrationTest) Time elapsed: 15.989 sec <<< FAILURE! java.lang.AssertionError: Different number of lines in expected and obtained result. expected:<9> but was:<5> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:743) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:555) at org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:272) at org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:258) at org.apache.flink.streaming.api.complex.ComplexIntegrationTest.after(ComplexIntegrationTest.java:91) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
Gabor Gevay created FLINK-2377: -- Summary: AbstractTestBase.deleteAllTempFiles sometimes fails on Windows Key: FLINK-2377 URL: https://issues.apache.org/jira/browse/FLINK-2377 Project: Flink Issue Type: Bug Components: Tests Environment: Windows Reporter: Gabor Gevay Priority: Minor This is probably another file closing issue. (that is, Windows won't delete open files, as opposed to Linux) I have encountered two concrete tests so far where this actually appears: CsvOutputFormatITCase and CollectionSourceTest. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2376) testFindConnectableAddress sometimes fails on Windows because of the time limit
Gabor Gevay created FLINK-2376: -- Summary: testFindConnectableAddress sometimes fails on Windows because of the time limit Key: FLINK-2376 URL: https://issues.apache.org/jira/browse/FLINK-2376 Project: Flink Issue Type: Bug Environment: Windows Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2374) File.setWritable doesn't work on Windows, which makes several tests fail
Gabor Gevay created FLINK-2374: -- Summary: File.setWritable doesn't work on Windows, which makes several tests fail Key: FLINK-2374 URL: https://issues.apache.org/jira/browse/FLINK-2374 Project: Flink Issue Type: Bug Environment: Windows Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Trivial The tests that use setWritable to test the handling of certain error conditions should be skipped in case we are under Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2369) On Windows, in testFailingSortingDataSinkTask the temp file is not removed
Gabor Gevay created FLINK-2369: -- Summary: On Windows, in testFailingSortingDataSinkTask the temp file is not removed Key: FLINK-2369 URL: https://issues.apache.org/jira/browse/FLINK-2369 Project: Flink Issue Type: Bug Components: Distributed Runtime Environment: Windows 7 64-bit, JDK 8u51 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The test fails with the assert at the very end ("Temp output file has not been removed"). This happens because FileOutputFormat.tryCleanupOnError can't delete the file, because it is still open (note, that Linux happily deletes open files). A fix would be to have the this.format.close(); not just in the finally block (in DataSinkTask.invoke), but also before the tryCleanupOnError call (around line 217). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2359) Add factory methods to the Java TupleX types
Gabor Gevay created FLINK-2359: -- Summary: Add factory methods to the Java TupleX types Key: FLINK-2359 URL: https://issues.apache.org/jira/browse/FLINK-2359 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.10 Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The compiler doesn't infer generic type arguments from constructor arguments, which means that we have to call Tuple constructors like this: Tuple2 = new Tuple2(5, "foo"); I propose adding a factory method, which would provide the following alternative: Tuple2 = Tuple2.create(5, "foo"); (Note that C++ and C# Tuples also have similar factory methods for the same reason.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2255) In the TopSpeedWindowing examples, every window contains only 1 element, because event time is in millisec, but eviction is in sec
Gabor Gevay created FLINK-2255: -- Summary: In the TopSpeedWindowing examples, every window contains only 1 element, because event time is in millisec, but eviction is in sec Key: FLINK-2255 URL: https://issues.apache.org/jira/browse/FLINK-2255 Project: Flink Issue Type: Bug Components: Examples, Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The event times are generated by System.currentTimeMillis(), so evictionSec should be multiplied by 1000, when passing it to Time.of. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2181) SessionWindowing example has a memleak
Gabor Gevay created FLINK-2181: -- Summary: SessionWindowing example has a memleak Key: FLINK-2181 URL: https://issues.apache.org/jira/browse/FLINK-2181 Project: Flink Issue Type: Bug Components: Examples, Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay The trigger policy objects belonging to already terminated sessions are kept in memory, and also NotifyOnLastGlobalElement gets called on them. This causes the program to eat up more and more memory, and also to get slower with time. Mailing list discussion: http://mail-archives.apache.org/mod_mbox/flink-dev/201505.mbox/%3CCADXjeyBW9uaA=ayde+9r+qh85pblyavuegsr7h8pwkyntm9...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2148) Approximately calculate the number of distinct elements of a stream
Gabor Gevay created FLINK-2148: -- Summary: Approximately calculate the number of distinct elements of a stream Key: FLINK-2148 URL: https://issues.apache.org/jira/browse/FLINK-2148 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Priority: Minor In the paper http://people.seas.harvard.edu/~minilek/papers/f0.pdf Kane et al. describes an optimal algorithm for estimating the number of distinct elements in a data stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2147) Approximate calculation of frequencies in data streams
Gabor Gevay created FLINK-2147: -- Summary: Approximate calculation of frequencies in data streams Key: FLINK-2147 URL: https://issues.apache.org/jira/browse/FLINK-2147 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Priority: Minor Count-Min sketch is a hashing-based algorithm for approximately keeping track of the frequencies of elements in a data stream. It is described by Cormode et al. in the following paper: http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf Note that this algorithm can be conveniently implemented in a distributed way, as described in section 3.2 of the paper. The paper http://www.vldb.org/conf/2002/S10P03.pdf also describes algorithms for approximately keeping track of frequencies, but here the user can specify a threshold below which she is not interested in the frequency of an element. The error-bounds are also different than the Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2146) Fast calculation of min/max with arbitrary eviction and triggers
Gabor Gevay created FLINK-2146: -- Summary: Fast calculation of min/max with arbitrary eviction and triggers Key: FLINK-2146 URL: https://issues.apache.org/jira/browse/FLINK-2146 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Priority: Minor The last algorithm described here could be used: http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html It is based on a double-ended queue which maintains a sorted list of elements of the current window that have the possibility of being the maximal element in the future. Store: O(1) amortized Evict: O(1) emitWindow: O(1) memory: O(N) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2145) Median calculation for windows
Gabor Gevay created FLINK-2145: -- Summary: Median calculation for windows Key: FLINK-2145 URL: https://issues.apache.org/jira/browse/FLINK-2145 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The PreReducer for this has the following algorithm: We maintain two multisets (as, for example, balanced binary search trees), that always partition the elements of the current window to smaller-than-median and larger-than-median elements. At each store and evict, we can maintain this invariant with only O(1) multiset operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2144) Implement count, average, and variance for windows
Gabor Gevay created FLINK-2144: -- Summary: Implement count, average, and variance for windows Key: FLINK-2144 URL: https://issues.apache.org/jira/browse/FLINK-2144 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor By count I mean the number of elements in the window. These can be implemented very efficiently building on FLINK-2143. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter
Gabor Gevay created FLINK-2143: -- Summary: Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter Key: FLINK-2143 URL: https://issues.apache.org/jira/browse/FLINK-2143 Project: Flink Issue Type: Sub-task Reporter: Gabor Gevay Assignee: Gabor Gevay If the inverse of the reduceFunction is also available (for example subtraction when summing numbers), then a PreReducer can maintain the aggregate in O(1) memory and O(1) time for evict, store, and emitWindow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2142) GSoC project: Exact and Approximate Statistics for Data Streams and Windows
Gabor Gevay created FLINK-2142: -- Summary: GSoC project: Exact and Approximate Statistics for Data Streams and Windows Key: FLINK-2142 URL: https://issues.apache.org/jira/browse/FLINK-2142 Project: Flink Issue Type: New Feature Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor The goal of this project is to implement basic statistics of data streams and windows (like average, median, variance, correlation, etc.) in a computationally efficient manner. This involves designing custom preaggregators. The exact calculation of some statistics (eg. frequencies, or the number of distinct elements) would require memory proportional to the number of elements in the input (the window or the entire stream). However, there are efficient algorithms and data structures using less memory for calculating the same statistics only approximately, with user-specified error bounds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2121) FileInputFormat.addFilesInDir miscalculates total size
Gabor Gevay created FLINK-2121: -- Summary: FileInputFormat.addFilesInDir miscalculates total size Key: FLINK-2121 URL: https://issues.apache.org/jira/browse/FLINK-2121 Project: Flink Issue Type: Bug Components: Core Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor In FileInputFormat.addFilesInDir, the length variable should start from 0, because the return value is always used by adding it to the length (instead of just assigning). So with the current version, the length before the call will be seen twice in the result. mvn verify caught this for me now. The reason why this hasn't been seen yet, is because testGetStatisticsMultipleNestedFiles catches this only if it gets the listings of the outer directory in a certain order. Concretely, if the inner directory is seen before the other file in the outer directory, then length is 0 at that point, so the bug doesn't show. But if the other file is seen first, then its size is added twice to the total result. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2114) PunctuationPolicy.toString() throws NullPointerException if extractor is null
Gabor Gevay created FLINK-2114: -- Summary: PunctuationPolicy.toString() throws NullPointerException if extractor is null Key: FLINK-2114 URL: https://issues.apache.org/jira/browse/FLINK-2114 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gabor Gevay Assignee: Gabor Gevay Priority: Minor Parenthesis is missing in PunctuationPolicy.toString() around the conditional operator checking for not null, which makes the condition always true. -- This message was sent by Atlassian JIRA (v6.3.4#6332)