[jira] [Created] (FLINK-6595) Nested SQL queries do not expose proctime / rowtime attributes
Haohui Mai created FLINK-6595: - Summary: Nested SQL queries do not expose proctime / rowtime attributes Key: FLINK-6595 URL: https://issues.apache.org/jira/browse/FLINK-6595 Project: Flink Issue Type: Bug Reporter: Haohui Mai Assignee: Haohui Mai Fix For: 1.3.0 We found out that the group windows cannot be applied with nested queries out-of-the-box: {noformat} SELECT * FROM ( (SELECT ...) UNION ALL) (SELECT ...) ) GROUP BY foo, TUMBLE(proctime, ...) {noformat} Flink complains about {{proctime}} is undefined. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6594) Implement Flink Dispatcher for Kubernetes
Larry Wu created FLINK-6594: --- Summary: Implement Flink Dispatcher for Kubernetes Key: FLINK-6594 URL: https://issues.apache.org/jira/browse/FLINK-6594 Project: Flink Issue Type: New Feature Components: Cluster Management Environment: FLIP-6 feature branch Reporter: Larry Wu This task is to implement Flink Dispatcher for Kubernetes, which is deployed to Kubernetes cluster as a long-running pod. The Flink Dispatcher accepts job submissions from Flink clients and asks Kubernetes API Server to create and monitor a virtual cluster of Flink JobManager pod and Flink TaskManager Pods. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] Release 1.3.0 RC1 (Non voting, testing release candidate)
Thanks Robert, Just for the record I think there are still some problems with incremental snapshots, I think Stefan is still working on it. I added some comments to https://issues.apache.org/jira/browse/FLINK-6537 Gyula Robert Metzgerezt írta (időpont: 2017. máj. 15., H, 19:41): > Hi Devs, > > This is the second non-voting RC. The last RC had some big issues, making > it hard to start Flink locally. I hope this RC proves to be more stable. > > I hope to create the first voting RC by end of this week. > > - > > The release commit is 3659a82f553fedf8afe8b5fae75922075fe17e85 > > The artifacts are located here: > http://people.apache.org/~rmetzger/flink-1.3.0-rc1/ > > The maven staging repository is here: > https://repository.apache.org/content/repositories/orgapacheflink-1119 > > - > > Happy testing! > > Regards, > Robert >
Static deployment scheme for failure scenario
Hello Flink Community, is it possible to force a job into a particular task-node placement? I want to run a failure scenario, which depends on the machines' configuration. To my knowledge the current scheduler deploys tasks in a first-come-first-serve fashion. In that case the only way to fix the deployment scheme would be to change the scheduler, right? Did anyone attempt this so far? Best, Niklas -- Niklas Semmler PhD Student / Research Assistant TU Berlin, INET, Room MAR 4.027 Marchstr 23, 10587 Berlin Tel.: +49 (0)30 314 75739 http://inet.tu-berlin.de/~nsemmler/
[DISCUSS] Release 1.3.0 RC1 (Non voting, testing release candidate)
Hi Devs, This is the second non-voting RC. The last RC had some big issues, making it hard to start Flink locally. I hope this RC proves to be more stable. I hope to create the first voting RC by end of this week. - The release commit is 3659a82f553fedf8afe8b5fae75922075fe17e85 The artifacts are located here: http://people.apache.org/~rmetzger/flink-1.3.0-rc1/ The maven staging repository is here: https://repository.apache.org/content/repositories/orgapacheflink-1119 - Happy testing! Regards, Robert
[jira] [Created] (FLINK-6593) Fix Bug in ProctimeAttribute or RowtimeAttribute with join UDTF
Ruidong Li created FLINK-6593: - Summary: Fix Bug in ProctimeAttribute or RowtimeAttribute with join UDTF Key: FLINK-6593 URL: https://issues.apache.org/jira/browse/FLINK-6593 Project: Flink Issue Type: Bug Reporter: Ruidong Li Assignee: Ruidong Li when a Table has ProctimeAttribute or RowtimeAttribute and join with A UDTF function, the `resultNames` parameter of `generateResultExpression` method in class `CodeGenerator` did not filter the time indicators, causing a CodeGenException("Arity of result field names does not match number of expressions.") -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6591) Extend functionality of final ConversionMapper
Timo Walther created FLINK-6591: --- Summary: Extend functionality of final ConversionMapper Key: FLINK-6591 URL: https://issues.apache.org/jira/browse/FLINK-6591 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther The functionality of the ConversionMapper generated in {{TableEnvironment#generateRowConverterFunction}} is very limited right now: - It does not support conversion of nested Row types, e.g. a nested Avro-record can be read with the KafkaAvroTableSource into a nested row structure, but this structure can not be converted back into a Pojo or Avro structure. The code generator needs to be extended for this. - The Table API supports BasicTypeInfo (e.g. {{Integer[]}}) as an input field, but since it works with ObjectTypeInfo internally, it cannot output the array. I disabled the test {{TableEnvironmentITCase#testAsFromTupleToPojo}} for now. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: questions about Flink's HashJoin performance
The Flink version is 1.2.0 On Mon, May 15, 2017 at 10:24 PM, weijie tongwrote: > @Till thanks for your reply. > > My code is similar to HashTableITCase.testInMemoryMutableHashTable() > . It just use the MutableHashTable class , there's no other Flink's > configuration. The main code body is: > > this.recordBuildSideAccessor = RecordSerializer.get(); >> this.recordProbeSideAccessor = RecordSerializer.get(); >> final int[] buildKeyPos = new int[]{buildSideJoinIndex}; >> final int[] probeKeyPos = new int[]{probeSideJoinIndex}; >> final Class[] keyType = (Class[]) new >> Class[]{BytesValue.class}; >> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType); >> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType); >> this.pactRecordComparator = new >> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); >> Sequence buildSideRecordsSeq = >> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); >> Sequence probeSideRecordsSeq = >> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); >> List memorySegments; >> int pageSize = hashTableMemoryManager.getTotalNumPages(); >> try { >> memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, >> pageSize); >> } >> catch (MemoryAllocationException e) { >> LOGGER.error("could not allocate " + pageSize + " pages memory for >> HashJoin", e); >> Throwables.propagate(e); >> return; >> } >> try { >> Stopwatch stopwatch = Stopwatch.createStarted(); >> UniformRecordGenerator buildInput = new >> UniformRecordGenerator(buildSideRecordsSeq); >> UniformRecordGenerator probeInput = new >> UniformRecordGenerator(probeSideRecordsSeq); >> join = new MutableHashTable ( >> recordBuildSideAccessor, >> recordProbeSideAccessor, >> recordBuildSideComparator, >> recordProbeSideComparator, >> pactRecordComparator, >> memorySegments, >> ioManager >> ); >> join.open(buildInput,probeInput); >> >> LOGGER.info("construct hash table elapsed:" + >> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); >> >> > The BytesValue type is self defined one which holds byte[] , but just like > the original StringValue, also has the same serDe performance. > > > while (join.nextRecord()) { > Record currentProbeRecord = join.getCurrentProbeRecord(); > MutableObjectIterator buildSideIterator = > join.getBuildSideIterator(); > while (buildSideIterator.next(reusedBuildSideRow) != null) { > materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, > buildSideIndex2Vector, rowNum); > materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, > probeSideIndex2Vector, rowNum); > rowNum++; > }} > > > > > I have tried both the Record ,Row class as the type of records without any > better improved performance . I also tried batched the input records. That > means the buildInput or probeInput variables of the first code block > which iterate one Record a time from another batched Records . Batched > records's content stay in memory in Drill's ValueVector format. Once a > record is need to participate in the build or probe phase from a > iterate.next() call, > it will be fetched from the batched in memory ValueVector content. But no > performance gains. > > > The top hotspot profile from Jprofiler is below: > > > Hot spot,"Self time (microseconds)","Average Time","Invocations" > org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" > org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" > org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" > org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238," > n/a","n/a" > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord, > 10955,"n/a","n/a" > org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a" > org.apache.flink.runtime.memory.MemoryManager. > allocatePages,104259,"n/a","n/a" > > > My log show that hashjoin.open() method costs too much time. > > > construct hash table elapsed:1885ms > > > > > On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann > wrote: > >> Hi Weijie, >> >> it might be the case that batching the processing of multiple rows can >> give you an improved performance compared to single row processing. >> >> Maybe you could share the exact benchmark base line results and the code >> you use to test Flink's MutableHashTable with us. Also the Flink >> configuration and how you run it would be of interest. That way we might be >> able to see if we can tune Flink a bit more. >> >> Cheers, >> Till >> >> On Sun, May 14, 2017 at 5:23 AM, weijie tong >> wrote: >> >>> I has a test case to use Flink's MutableHashTable class to do a hash >>> join on a local machine with 64g memory, 64cores. The test case is one >>> build table with 14w rows ,one probe table with 320w rows ,the matched >>> result rows is 12
Re: questions about Flink's HashJoin performance
@Till thanks for your reply. My code is similar to HashTableITCase.testInMemoryMutableHashTable() . It just use the MutableHashTable class , there's no other Flink's configuration. The main code body is: this.recordBuildSideAccessor = RecordSerializer.get(); > this.recordProbeSideAccessor = RecordSerializer.get(); > final int[] buildKeyPos = new int[]{buildSideJoinIndex}; > final int[] probeKeyPos = new int[]{probeSideJoinIndex}; > final Class[] keyType = (Class[]) new > Class[]{BytesValue.class}; > this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType); > this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType); > this.pactRecordComparator = new > HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); > Sequence buildSideRecordsSeq = > makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); > Sequence probeSideRecordsSeq = > makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); > List memorySegments; > int pageSize = hashTableMemoryManager.getTotalNumPages(); > try { > memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, > pageSize); > } > catch (MemoryAllocationException e) { > LOGGER.error("could not allocate " + pageSize + " pages memory for > HashJoin", e); > Throwables.propagate(e); > return; > } > try { > Stopwatch stopwatch = Stopwatch.createStarted(); > UniformRecordGenerator buildInput = new > UniformRecordGenerator(buildSideRecordsSeq); > UniformRecordGenerator probeInput = new > UniformRecordGenerator(probeSideRecordsSeq); > join = new MutableHashTable( > recordBuildSideAccessor, > recordProbeSideAccessor, > recordBuildSideComparator, > recordProbeSideComparator, > pactRecordComparator, > memorySegments, > ioManager > ); > join.open(buildInput,probeInput); > > LOGGER.info("construct hash table elapsed:" + > stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); > > The BytesValue type is self defined one which holds byte[] , but just like the original StringValue, also has the same serDe performance. while (join.nextRecord()) { Record currentProbeRecord = join.getCurrentProbeRecord(); MutableObjectIterator buildSideIterator = join.getBuildSideIterator(); while (buildSideIterator.next(reusedBuildSideRow) != null) { materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum); materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum); rowNum++; }} I have tried both the Record ,Row class as the type of records without any better improved performance . I also tried batched the input records. That means the buildInput or probeInput variables of the first code block which iterate one Record a time from another batched Records . Batched records's content stay in memory in Drill's ValueVector format. Once a record is need to participate in the build or probe phase from a iterate.next() call, it will be fetched from the batched in memory ValueVector content. But no performance gains. The top hotspot profile from Jprofiler is below: > Hot spot,"Self time (microseconds)","Average Time","Invocations" org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a" org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a" org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a" org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a" My log show that hashjoin.open() method costs too much time. > construct hash table elapsed:1885ms On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann wrote: > Hi Weijie, > > it might be the case that batching the processing of multiple rows can > give you an improved performance compared to single row processing. > > Maybe you could share the exact benchmark base line results and the code > you use to test Flink's MutableHashTable with us. Also the Flink > configuration and how you run it would be of interest. That way we might be > able to see if we can tune Flink a bit more. > > Cheers, > Till > > On Sun, May 14, 2017 at 5:23 AM, weijie tong > wrote: > >> I has a test case to use Flink's MutableHashTable class to do a hash join >> on a local machine with 64g memory, 64cores. The test case is one build >> table with 14w rows ,one probe table with 320w rows ,the matched result >> rows is 12 w. >> >> It takes 2.2 seconds to complete the join.The performance seems bad. I >> ensure there's no overflow, the smaller table is the build side. The >> MutableObjectIterator is a sequence of Rows. The Row is composed of several >> fields which are byte[]. Through my log,I find the
[jira] [Created] (FLINK-6590) Integrate generated tables into documentation
Chesnay Schepler created FLINK-6590: --- Summary: Integrate generated tables into documentation Key: FLINK-6590 URL: https://issues.apache.org/jira/browse/FLINK-6590 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Chesnay Schepler Assignee: Chesnay Schepler -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] Release 1.3.0 RC0 (Non voting, testing release candidate)
I'll now kick off the RC creation process. On Sun, May 14, 2017 at 7:14 PM, Stephan Ewenwrote: > +1 for another RC once the shading fix is merged. > > Even if RC1 would have pending issues, users that want to help testing need > a new RC, because (as Robert said), RC0's shading issues make it virtually > unusable... > > On Fri, May 12, 2017 at 3:45 PM, Robert Metzger > wrote: > > > I'm considering to do another non voting RC1 on Monday with the > incremental > > checkpointing and maven fixes in (and of course as many other things as > > possible). Once the other critical fixes are in (ideally Tuesday or > > Wednesday), I'll create the first voting one. > > > > The current RC0 is almost unusable because of the incorrect shading. > > > > On Fri, May 12, 2017 at 2:09 PM, Greg Hogan wrote: > > > > > +1 for sticking to the code freeze deadline and building a new release > > > candidate but since the release is still two weeks off (5/26) I think > it > > > better to delay voting to give time for additional bug fixes. > > > > > > > > > > On May 11, 2017, at 10:19 AM, Robert Metzger > > > wrote: > > > > > > > > It seems that we found quite a large number of critical issues in the > > > first > > > > RC. > > > > > > > > - FLINK-6537 Umbrella issue for fixes to incremental snapshots > (Stefan > > > has > > > > a PR open to fix the critical ones) > > > > - FLINK-6531 Deserialize checkpoint hooks with user classloader (has > a > > > > pending PR) > > > > - FLINK-6515 KafkaConsumer checkpointing fails because of ClassLoader > > > > issues (status unknown) > > > > - FLINK-6514 Cannot start Flink Cluster in standalone mode (Stephan > is > > on > > > > it) > > > > - FLINK-6508 Include license files of packaged dependencies (Stephan > is > > > on > > > > it) + FLINK-6501 Make sure NOTICE files are bundled into shaded JAR > > files > > > > - FLINK-6284 Incorrect sorting of completed checkpoints in > > > > ZooKeeperCompletedCheckpointStore (unknown) > > > > > > > > I would like to get these issues fixed by end of this week (Sunday), > so > > > > that I can create the first voting RC on Monday morning. > > > > Please reject if you think we will not manage to get the stuff fixed > > > until > > > > then. > > > > > > > > > > > > > > > > On Thu, May 11, 2017 at 10:54 AM, Till Rohrmann < > trohrm...@apache.org> > > > > wrote: > > > > > > > >> Unfortunately, it won't be fully functional in 1.3. > > > >> > > > >> Cheers, > > > >> Till > > > >> > > > >> On Thu, May 11, 2017 at 10:45 AM, Renjie Liu < > liurenjie2...@gmail.com > > > > > > >> wrote: > > > >> > > > >>> @Rohrmann Will FLIP 6 be fully functional in 1.3 release? > > > >>> > > > >>> On Thu, May 11, 2017 at 4:12 PM Gyula Fóra > > > wrote: > > > >>> > > > Thanks Stefan! > > > Gyula > > > > > > Stefan Richter ezt írta (időpont: > > 2017. > > > >>> máj. > > > 11., Cs, 10:04): > > > > > > > > > > > Hi, > > > > > > > > Thanks for reporting this. I found a couple of issues yesterday > > and I > > > >>> am > > > > currently working on a bundle of fixes. I will take a look at > this > > > problem, > > > > and if it is already covered. > > > > > > > > Best, > > > > Stefan > > > > > > > >> Am 11.05.2017 um 09:47 schrieb Gyula Fóra >: > > > >> > > > >> Hi, > > > >> I am not sure if this belong to this thread, but while trying to > > > >> run > > > >>> a > > > > job > > > >> with rocks incremental backend I ran into 2 issues: > > > >> > > > >> One with savepoints, I can't figure out because I can't make > sense > > > >> of > > > the > > > >> error or how it happenned: > > > >> The error stack trace is here: > > > >> https://gist.github.com/gyfora/2f7bb387bbd9f455f9702908cde0b239 > > > >> This happens on every savepoint attempt and seems to be related > to > > > >>> the > > > >> kafka source. Interestingly other tasks succeed in writing data > to > > > hdfs. > > > >> > > > >> The other one is covered by > > > > https://issues.apache.org/jira/browse/FLINK-6531 I > > > >> guess. I am not sure if the first one is related though. > > > >> > > > >> Thank you! > > > >> Gyula > > > >> > > > >> Till Rohrmann ezt írta (időpont: 2017. > > máj. > > > 11., > > > > Cs, > > > >> 9:14): > > > >> > > > >>> Hi Renjie, > > > >>> > > > >>> 1.3 already contains some Flip-6 code. However, it is not yet > > > >> fully > > > >>> functional. What you already can do is to run local jobs on the > > > >>> Flip-6 > > > > code > > > >>> base by instantiating a MiniCluster and then using the > > > >>> Flip6LocalStreamEnvironment. > > > >>> > > > >>> Cheers, > > > >>> Till > > > >>> > > > >>> On Thu,
[jira] [Created] (FLINK-6589) ListSerializer should deserialize as ArrayList with size + 1
Fabian Hueske created FLINK-6589: Summary: ListSerializer should deserialize as ArrayList with size + 1 Key: FLINK-6589 URL: https://issues.apache.org/jira/browse/FLINK-6589 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.3.0, 1.4.0 Reporter: Fabian Hueske The {{ListSerializer}} deserializes a list as {{ArrayList}} with exactly the required capacity, i.e., number of serialized objects. Several operators in the Table API have a {{MapState}} to store received elements in a list per timestamp. Hence, retrieving the list and adding one element to the list is a very common operation. Since the list which is deserialized has no room left for adding elements, the first insertion into the list will result in growing the {{ArrayList}} which is expensive. I propose to initialize the {{ArrayList}} returned by the {{ListSerializer}} with numberOfSerializedElements + 1. This will only marginally increase the size of the list and allow for one insertion without growing the list. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6588) Rename NumberOfFullRestarts metric
Chesnay Schepler created FLINK-6588: --- Summary: Rename NumberOfFullRestarts metric Key: FLINK-6588 URL: https://issues.apache.org/jira/browse/FLINK-6588 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.3.0, 1.4.0 Reporter: Chesnay Schepler The metric for the number of full restarts is currently called {{fullRestarts}}. For clarity and consitency purposes I propose to rename it to {{numFullRestarts}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6587) Java Table API cannot parse function names starting with keywords
Timo Walther created FLINK-6587: --- Summary: Java Table API cannot parse function names starting with keywords Key: FLINK-6587 URL: https://issues.apache.org/jira/browse/FLINK-6587 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther The ExpressionParser of the Java Table API has problems with functions that start with a reserved keyword. e.g. a function must not be called {{summing}} because {{sum}} is reserved. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6586) InputGateMetrics#refreshAndGetMin returns Integer.MAX_VALUE for local channels
Chesnay Schepler created FLINK-6586: --- Summary: InputGateMetrics#refreshAndGetMin returns Integer.MAX_VALUE for local channels Key: FLINK-6586 URL: https://issues.apache.org/jira/browse/FLINK-6586 Project: Flink Issue Type: Bug Components: Metrics, Network Affects Versions: 1.3.0, 1.4.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Priority: Trivial Fix For: 1.3.0, 1.4.0 The {{InputGateMetrics#refreshAndGetMin}} returns {{Integer.MAX_VALUE}} when working with {{LocalChannels}}. In this case it should return 0 instead. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6585) Table examples are not runnable in IDE
Timo Walther created FLINK-6585: --- Summary: Table examples are not runnable in IDE Key: FLINK-6585 URL: https://issues.apache.org/jira/browse/FLINK-6585 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Running Table API examples in {{flink-examples-table}} fails with: {code} Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.TableEnvironment {code} Seems to be a Maven issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6584) Support multiple consecutive windows in SQL
Timo Walther created FLINK-6584: --- Summary: Support multiple consecutive windows in SQL Key: FLINK-6584 URL: https://issues.apache.org/jira/browse/FLINK-6584 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Right now, the Table API supports multiple consecutive windows as follows: {code} val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val t = table .window(Tumble over 2.millis on 'rowtime as 'w) .groupBy('w) .select('w.rowtime as 'rowtime, 'int.count as 'int) .window(Tumble over 4.millis on 'rowtime as 'w2) .groupBy('w2) .select('w2.rowtime, 'w2.end, 'int.count) {code} Similar behavior should be supported by the SQL API as well. We need to introduce a new auxiliary group function, but this should happen in sync with Apache Calcite. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: questions about Flink's HashJoin performance
Hi Weijie, it might be the case that batching the processing of multiple rows can give you an improved performance compared to single row processing. Maybe you could share the exact benchmark base line results and the code you use to test Flink's MutableHashTable with us. Also the Flink configuration and how you run it would be of interest. That way we might be able to see if we can tune Flink a bit more. Cheers, Till On Sun, May 14, 2017 at 5:23 AM, weijie tongwrote: > I has a test case to use Flink's MutableHashTable class to do a hash join > on a local machine with 64g memory, 64cores. The test case is one build > table with 14w rows ,one probe table with 320w rows ,the matched result > rows is 12 w. > > It takes 2.2 seconds to complete the join.The performance seems bad. I > ensure there's no overflow, the smaller table is the build side. The > MutableObjectIterator is a sequence of Rows. The Row is composed of several > fields which are byte[]. Through my log,I find the open() method takes > 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's > profile shows the MutableObjectIterator's next() method call is the > hotspot. > > > I want to know how to tune this scenario. I find Drill's HashJoin is batch > model. Its build side's input is a RecordBatch which holds batch of rows > and memory size is approach to L2 cache. Through this strategy it will gain > less method calls (that means call to next() ) and much efficient to cpu > calculation. I also find SQL server's paper noticed the batch model's > performance gains (https://www.microsoft.com/en-us/research/wp-content/ > uploads/2013/06/Apollo3-Sigmod-2013-final.pdf) . I guess the > performance's down is due to the single row iterate model. > > > Hope someone to correct my opinion. Also maybe I have a wrong use of the > MutableHashTable. wait for someone to give an advice. >
[jira] [Created] (FLINK-6583) Enable QueryConfig in count base GroupWindow
sunjincheng created FLINK-6583: -- Summary: Enable QueryConfig in count base GroupWindow Key: FLINK-6583 URL: https://issues.apache.org/jira/browse/FLINK-6583 Project: Flink Issue Type: New Feature Components: Table API & SQL Affects Versions: 1.3.0, 1.4.0 Reporter: sunjincheng Assignee: sunjincheng Enable QueryConfig in count base GroupWindow by Add a custom Trigger `CountTriggerWithCleanupState`. See more in FLINK-6491. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6582) Maven archetype is not buildable by default due to ${scala.binary.version}
Dawid Wysakowicz created FLINK-6582: --- Summary: Maven archetype is not buildable by default due to ${scala.binary.version} Key: FLINK-6582 URL: https://issues.apache.org/jira/browse/FLINK-6582 Project: Flink Issue Type: Bug Components: Build System, Quickstarts Affects Versions: 1.3.0 Reporter: Dawid Wysakowicz When creating a java project from maven-archetype dependencies to flink are unresolvable due to {scala.binary.version} placeholder. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6581) Dynamic property parsing broken for YARN
Till Rohrmann created FLINK-6581: Summary: Dynamic property parsing broken for YARN Key: FLINK-6581 URL: https://issues.apache.org/jira/browse/FLINK-6581 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.3.0, 1.4.0 Reporter: Till Rohrmann The dynamic property parsing in YARN is broken. For example, the following dynamic property won't be parsed properly: {{-yDenv.java.opts="-DappName=foobar -Did=1"}}. The result will be equivalent to {{-yDenv.java.opts="-DappName"}}. The problem is that {{FlinkYarnSessionCli#getDynamicProperties}} assumes that all dynamic properties have the form {{-D=}} where {{}} does not contain any {{=}}. I propose to change the parsing of the dynamic properties string such that it splits the key-value pair not via {{dynamicProperties.split("=")}} but that we split the dynamic properties string at the first occurrence of the {{=}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6580) Flink on YARN doesnt start with default parameters
Robert Metzger created FLINK-6580: - Summary: Flink on YARN doesnt start with default parameters Key: FLINK-6580 URL: https://issues.apache.org/jira/browse/FLINK-6580 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.3.0 Reporter: Robert Metzger Assignee: Robert Metzger Priority: Blocker Just doing {{./bin/yarn-session.sh -n 1}} fails with {code} Error while deploying YARN cluster: Couldn't deploy Yarn cluster java.lang.RuntimeException: Couldn't deploy Yarn cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:436) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:626) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:482) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:479) at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:479) Caused by: java.lang.IllegalArgumentException: The configuration value 'containerized.heap-cutoff-min' is higher (600) than the requested amount of memory 256 at org.apache.flink.yarn.Utils.calculateHeapSize(Utils.java:100) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.setupApplicationMasterContainer(AbstractYarnClusterDescriptor.java:1263) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:803) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:568) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:434) ... 9 more {code} I think this issue has been introduced in FLINK-5904. Flink on YARN is now using the configuration parameters from the configuration file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6579) Add proper support for BasicArrayTypeInfo
Timo Walther created FLINK-6579: --- Summary: Add proper support for BasicArrayTypeInfo Key: FLINK-6579 URL: https://issues.apache.org/jira/browse/FLINK-6579 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther FLINK-6033 added only partial support for arrays of Java wrapper classes (e.g. Integer[]). In most cases operations fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)