[jira] [Created] (FLINK-10644) Batch Job: Speculative execution
JIN SUN created FLINK-10644: --- Summary: Batch Job: Speculative execution Key: FLINK-10644 URL: https://issues.apache.org/jira/browse/FLINK-10644 Project: Flink Issue Type: New Feature Components: JobManager Reporter: JIN SUN Assignee: JIN SUN Fix For: 1.8.0 Strugglers/outlier are tasks that run slower than most of the all tasks in a Batch Job, this somehow impact job latency, as pretty much this straggler will be in the critical path of the job and become as the bottleneck. Tasks may be slow for various reasons, including hardware degradation, or software mis-configuration, or noise neighboring. It's hard for JM to predict the runtime. To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark has *_speculative execution_*. Speculative execution is a health-check procedure that checks for tasks to be speculated, i.e. running slower in a ExecutionJobVertex than the median of all successfully completed tasks in that EJV, Such slow tasks will be re-submitted to another TM. It will not stop the slow tasks, but run a new copy in parallel. And will kill the others if one of them complete. This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be append later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10643) Bubble execution: Resource aware job execution
JIN SUN created FLINK-10643: --- Summary: Bubble execution: Resource aware job execution Key: FLINK-10643 URL: https://issues.apache.org/jira/browse/FLINK-10643 Project: Flink Issue Type: New Feature Components: JobManager Reporter: JIN SUN Assignee: JIN SUN Fix For: 1.8.0 Attachments: image-2018-10-22-16-28-32-355.png Today Flink support various channels such as pipelined channel and blocking channel. Blocking channel indicate that data need to be persistent in a batch and then it can be consumed later, it also indicate that the downstream task cannot start to process data unless its producer finished and also downstream task will only depends on this intermediate partition instead of upstream tasks. By leverage this characteristic, Flink already support fine grain-failover which will build a failover region has reduce failover cost. However, we can leverage this characteristic even more. As described by this [paper|http://www.vldb.org/pvldb/vol11/p746-yin.pdf] (VLDB 2018), *_Bubble Execution_* not only use this characteristic to implement fine-grain failover, but also use this to balance the resource utilization and job performance. As shown in the paper (also in the following chart), with 50% of the resource, it get 25% (0.75 speedup) average slow down for TPCH benchmark. !image-2018-10-22-16-28-32-355.png! This JIRA here is umbrella that try to apply the idea of this paper to FLINK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Fwd: reaching out from k8s Big Data SIG
Hi Flink community, I serve as chair on the Kubernetes Big Data SIG, and we've had some interest from the SIG in learning more about what the Flink community is working on in the kubernetes space. I'm putting out feelers to see if anybody from the community would be interested in giving a brief talk and/or demo of Flink integrations with k8s at one of our regular meetings. The Big Data SIG holds bi-weekly meetings at 10am Pacific time (next meeting is Oct 31). Feel free to attend our meetings, or visit our slack channel: https://kubernetes.slack.com/messages/C0ELB338T/ More SIG info is available on our regular meeting minutes doc: https://docs.google.com/document/d/1pnF38NF6N5eM8DlK088XUW85Vms4V2uTsGZvSp8MNIA/ Thanks! Erik Erlandson
[jira] [Created] (FLINK-10642) CodeGen split fields errors when maxGeneratedCodeLength equals 1
xueyu created FLINK-10642: - Summary: CodeGen split fields errors when maxGeneratedCodeLength equals 1 Key: FLINK-10642 URL: https://issues.apache.org/jira/browse/FLINK-10642 Project: Flink Issue Type: Bug Components: Table API SQL Affects Versions: 1.6.1 Reporter: xueyu Assignee: xueyu Several tests error in special config when setting maxGeneratedCodeLength 1. e.g. CalcITCase.testFilterOnCustomType:260 ? InvalidProgram Table program cannot be... JavaTableEnvironmentITCase.testAsFromAndToPojo:394 ? InvalidProgram Table prog... JavaTableEnvironmentITCase.testAsFromAndToPrivateFieldPojo:421 ? InvalidProgram JavaTableEnvironmentITCase.testAsFromPojo:288 ? InvalidProgram Table program c... JavaTableEnvironmentITCase.testAsFromPrivateFieldsPojo:366 ? InvalidProgram Ta... JavaTableEnvironmentITCase.testAsWithPojoAndGenericTypes:453 ? InvalidProgram ... TimeAttributesITCase.testPojoSupport:566 ? JobExecution Job execution failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10641) Slow when applying new containers
Jiayi Liao created FLINK-10641: -- Summary: Slow when applying new containers Key: FLINK-10641 URL: https://issues.apache.org/jira/browse/FLINK-10641 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.6.1 Reporter: Jiayi Liao Assignee: Jiayi Liao When requesting containers from yarn, the containers are received and returned over and over again like this: 14:36:19,486 INFO org.apache.flink.yarn.YarnResourceManager - Received new container: container_1535124617388_1936_01_000929 - Remaining pending container requests: 0 14:36:19,486 INFO org.apache.flink.yarn.YarnResourceManager - Returning excess container container_1535124617388_1936_01_000929. Sometimes it will last several minutes, which is out of our expectations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
回复:Re: Enable Slot Resource Profile for Resource Management
Thanks, Tison~ Thank You, Tony Xintong Song-- 发件人:Tzu-Li Chen 日 期:2018年10月22日 18:45:36 收件人:; 宋辛童(五藏) 主 题:Re: Enable Slot Resource Profile for Resource Management Hi Tony, I see the corresponding JIRA[1] and it looks like you don't attach it an mail list. I would do it for you and wonder if you ask for contributor bit to assign the JIRA to yourself. For the topic I give some comments on the JIRA and briefly, I am open if we can take advantage of current somehow ignored ResourceProfile APIs. Anyway, welcome on board! Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-10640 宋辛童(五藏) 于2018年10月22日周一 下午6:39写道: Hi all, We are planing to do some works related to Flink’s resource management. Precisely, we are trying to enable ResourceProfile-based resource management. Here is a brief description of our key ideas. Please let me know how you think about this. Thank You, Tony Xintong Song
Re: Enable Slot Resource Profile for Resource Management
Hi Tony, I see the corresponding JIRA[1] and it looks like you don't attach it an mail list. I would do it for you and wonder if you ask for contributor bit to assign the JIRA to yourself. For the topic I give some comments on the JIRA and briefly, I am open if we can take advantage of current somehow ignored ResourceProfile APIs. Anyway, welcome on board! Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-10640 宋辛童(五藏) 于2018年10月22日周一 下午6:39写道: > Hi all, > > We are planing to do some works related to > Flink’s resource management. Precisely, we are trying to enable > ResourceProfile-based resource management. > Here is a brief description of our key ideas. Please let me know how you > think about this. > > Thank You, > Tony Xintong Song >
Enable Slot Resource Profile for Resource Management
Hi all, We are planing to do some works related to Flink’s resource management. Precisely, we are trying to enable ResourceProfile-based resource management. Here is a brief description of our key ideas. Please let me know how you think about this. Thank You, Tony Xintong Song
[jira] [Created] (FLINK-10640) Enable Slot Resource Profile for Resource Management
Xintong Song created FLINK-10640: Summary: Enable Slot Resource Profile for Resource Management Key: FLINK-10640 URL: https://issues.apache.org/jira/browse/FLINK-10640 Project: Flink Issue Type: New Feature Components: ResourceManager Reporter: Xintong Song Motivation & Backgrounds * The existing concept of task slots roughly represents how many pipeline of tasks a TaskManager can hold. However, it does not consider the differences in resource needs and usage of individual tasks. Enabling resource profiles of slots may allow Flink to better allocate execution resources according to tasks fine-grained resource needs. * The community version Flink already contains APIs and some implementation for slot resource profile. However, such logic is not truly used. (ResourceProfile of slot requests is by default set to UNKNOWN with negative values, thus matches any given slot.) Preliminary Design * Slot Management A slot represents a certain amount of resources for a single pipeline of tasks to run in on a TaskManager. Initially, a TaskManager does not have any slots but a total amount of resources. When allocating, the ResourceManager finds proper TMs to generate new slots for the tasks to run according to the slot requests. Once generated, the slot's size (resource profile) does not change until it's freed. ResourceManager can apply different, portable strategies to allocate slots from TaskManagers. * TM Management The size and number of TaskManagers and when to start them can also be flexible. TMs can be started and released dynamically, and may have different sizes. We may have many different, portable strategies. E.g., an elastic session that can run multiple jobs like the session mode while dynamically adjusting the size of session (number of TMs) according to the realtime working load. * About Slot Sharing Slot sharing is a good heuristic to easily calculate how many slots needed to get the job running and get better utilization when there is no resource profile in slots. However, with resource profiles enabling finer-grained resource management, each individual task has its specific resource need and it does not make much sense to have multiple tasks sharing the resource of the same slot. Instead, we may introduce locality preferences/constraints to support the semantics of putting tasks in same/different TMs in a more general way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] Improve broadcast serialization
Sounds good to me :) Piotrek > On 19 Oct 2018, at 08:34, Zhijiang(wangzhijiang999) > wrote: > > I agree with the additional thoughts of a), b) and c). > > In all the current implementations of ChannelSelector, the selector channels > are either one or all, so it makes sense for change the interface as you > suggested if we will not extend other selectors for partial channels in > future. And the single channel implementation would reduce some overheads in > arrays and loop. For broadcast selector, it is no need to retrun channels > from selector and we can make a shortcut process for this special > implementation. > > Comparing 3 vs 5, I still prefer 3 currently which can reuse the current > network process. We only create one BufferBuilder for al thel channels and > build separate BufferConsumer for every channel sharing the same > BufferBuilder. To do so, we just need a few changes on RecordWriter side, do > not touch the following components in network stack. And it will already gain > most of the performance benefits by doing so, which copies serialization > temporary buffer only once to one BufferBuilder. > > I can first create the JIRA for single channel interface if you have not done > that before, and then continue with copying step by step. :) > > Best, > Zhijiang > -- > 发件人:Piotr Nowojski > 发送时间:2018年10月18日(星期四) 17:47 > 收件人:Zhijiang(wangzhijiang999) > 抄 送:Nico Kruber ; dev > 主 题:Re: [DISCUSS] Improve broadcast serialization > > Hey, > > I also think that 3rd option is the most promising, however logic of “dirty” > channels might be causing some overheads. I was also thinking about other > option: > > 5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to > common/shared BufferBuilder, but somehow marked it as targeted to only one > channel - we would send it over the network to all of the receivers, but all > except of one would ignore it. This might be easier to implement in > BroadcastRecordWriter, but would require extra logic on the receiver side. > With respect to the performance it also might be better compared to 3. > > Couple of more thoughts: > > a) if we select BroadcastRecordWriter, literally the only way how it can be > polluted by non broadcast writes are latency markers via `randomEmit`. When > choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so > we shouldn’t optimise for it, but pick something that’s easiest to implement. > b) there are no use cases where `ChannelSelector` returns anything else > besides single channel or broadcast. > > b) point brings me to one more thing. I was once playing with simplifying > `ChannelSelector` interface by adding new one `SingleChannelSelector` with > method: > > `int selectChannel(T record, int numChannels);` > > And it was resulting with ~10% performance speed up for network stack alone > (overhead of creating singleton arrays and iterating over them). I didn’t > follow up on this, because performance gain wasn’t super huge, while it > complicated `RecordWriter`, since it had to handle both either > `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there > are no use cases for selecting more then one, but not all of the channels and > that anyway we go with broadcasting, we will have to special handle > `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the > implementation and drop this multi channel ChannelSelector. > > I think we should to this as a first step in a preparation before either 3. > or 5. (changing ChannelSelector signature to: > > int selectChannel(T record, int numChannels); > > ) > > What do you think? > > Piotrek > > On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) > wrote: > Hi Piotr, > > Thanks for your replies and suggestions! > > For my rough idea of skip index list, I agree with your concerns of > performance for non-broadcast case and complicated implementation. Although I > think this idea seems more unified in semantics for "emit", "broadcastEmit" > and "randomEmit" APIs, maybe it is not worth going deep into it currently for > global changes. > > Currently RecordWriter provides three main methods to write elements in > different semantics: > > "broadcastEmit" would write the element to all the channels, used for > watermark currently. > "randomEmit" would write the element to one random channel, used for latency > marker currently. > "emit" would write the element to some channels via ChannelSelector, used for > normal records currectly. And the selected channels may be one, some or all. > > If we want to retain these APIs for different requirements, then the > RecordWriter should not be aware of which kind of elements would be written > via APIs, so we should not make any assumings in the implementation. In > details, I know the "randomEmit" in only used for latency marker currently,
[jira] [Created] (FLINK-10639) Fix java syntax error in document
sunjincheng created FLINK-10639: --- Summary: Fix java syntax error in document Key: FLINK-10639 URL: https://issues.apache.org/jira/browse/FLINK-10639 Project: Flink Issue Type: Bug Components: Documentation, Table API SQL Affects Versions: 1.6.1, 1.7.0 Reporter: sunjincheng Assignee: Hequn Cheng Fix For: 1.7.0, 1.6.1 Attachments: image-2018-10-22-16-54-10-305.png Due to the [StreamTableSourceFactory|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/StreamTableSourceFactory.scala] is a trait. So the java example in the document should using "implements" keyword. !image-2018-10-22-16-54-10-305.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10638) Invalid table scan resolution for temporal join queries
Timo Walther created FLINK-10638: Summary: Invalid table scan resolution for temporal join queries Key: FLINK-10638 URL: https://issues.apache.org/jira/browse/FLINK-10638 Project: Flink Issue Type: Bug Components: Table API SQL Reporter: Timo Walther Assignee: Timo Walther Registered tables that contain a temporal join are not properly resolved when performing a table scan. {code} LogicalProject(amount=[*($0, $4)]) LogicalFilter(condition=[=($3, $1)]) LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) NOT NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;]) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10637) Start MiniCluster with random REST port
Till Rohrmann created FLINK-10637: - Summary: Start MiniCluster with random REST port Key: FLINK-10637 URL: https://issues.apache.org/jira/browse/FLINK-10637 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.6.1, 1.5.4, 1.7.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.6, 1.6.3, 1.7.0 The {{MiniCluster}} picks a random port for the {{RpcService}} but not for the REST server endpoint. Due to this it falls back to {{8081}}. This can lead to port conflicts if tests are executed concurrently. I propose to rename the {{MiniClusterResource}} into {{MiniClusterResourceWithRestClient}} and add a new {{MiniClusterResource}} which only starts a {{MiniCluster}} with the REST port set to 0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)