[jira] [Created] (FLINK-10644) Batch Job: Speculative execution

2018-10-22 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10644:
---

 Summary: Batch Job: Speculative execution
 Key: FLINK-10644
 URL: https://issues.apache.org/jira/browse/FLINK-10644
 Project: Flink
  Issue Type: New Feature
  Components: JobManager
Reporter: JIN SUN
Assignee: JIN SUN
 Fix For: 1.8.0


Strugglers/outlier are tasks that run slower than most of the all tasks in a 
Batch Job, this somehow impact job latency, as pretty much this straggler will 
be in the critical path of the job and become as the bottleneck. 

Tasks may be slow for various reasons, including hardware degradation, or 
software mis-configuration, or noise neighboring. It's hard for JM to predict 
the runtime. 

To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
has *_speculative execution_*. Speculative execution is a health-check 
procedure that checks for tasks to be speculated, i.e. running slower in a 
ExecutionJobVertex than the median of all successfully completed tasks in that 
EJV, Such slow tasks will be re-submitted to another TM. It will not stop the 
slow tasks, but run a new copy in parallel. And will kill the others if one of 
them complete. 

This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
append later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10643) Bubble execution: Resource aware job execution

2018-10-22 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10643:
---

 Summary: Bubble execution: Resource aware job execution
 Key: FLINK-10643
 URL: https://issues.apache.org/jira/browse/FLINK-10643
 Project: Flink
  Issue Type: New Feature
  Components: JobManager
Reporter: JIN SUN
Assignee: JIN SUN
 Fix For: 1.8.0
 Attachments: image-2018-10-22-16-28-32-355.png

Today Flink support various channels such as pipelined channel and blocking 
channel. Blocking channel indicate that data need to be persistent in a batch 
and then it can be consumed later, it also indicate that the downstream task 
cannot start to process data unless its producer finished and also downstream 
task will only depends on this intermediate partition instead of upstream 
tasks. 

By leverage this characteristic, Flink already support fine grain-failover 
which will build a failover region has reduce failover cost.  However, we can 
leverage this characteristic even more. As described by this 
[paper|http://www.vldb.org/pvldb/vol11/p746-yin.pdf] (VLDB 2018), *_Bubble 
Execution_* not only use this characteristic to implement fine-grain failover, 
but also use this to balance the resource utilization and job performance. As 
shown in the paper (also in the following chart), with 50% of the resource, it 
get 25% (0.75 speedup) average slow down for TPCH benchmark.

!image-2018-10-22-16-28-32-355.png!

This JIRA here is umbrella that try to apply the idea of this paper to FLINK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Fwd: reaching out from k8s Big Data SIG

2018-10-22 Thread Erik Erlandson
Hi Flink community,

I serve as chair on the Kubernetes Big Data SIG, and we've had some
interest from the SIG in learning more about what the Flink community is
working on in the kubernetes space.

I'm putting out feelers to see if anybody from the community would be
interested in giving a brief talk and/or demo of Flink integrations with
k8s at one of our regular meetings.

The Big Data SIG holds bi-weekly meetings at 10am Pacific time (next
meeting is Oct 31). Feel free to attend our meetings, or visit our slack
channel:
https://kubernetes.slack.com/messages/C0ELB338T/

More SIG info is available on our regular meeting minutes doc:
https://docs.google.com/document/d/1pnF38NF6N5eM8DlK088XUW85Vms4V2uTsGZvSp8MNIA/

Thanks!
Erik Erlandson


[jira] [Created] (FLINK-10642) CodeGen split fields errors when maxGeneratedCodeLength equals 1

2018-10-22 Thread xueyu (JIRA)
xueyu created FLINK-10642:
-

 Summary: CodeGen split fields errors when maxGeneratedCodeLength 
equals 1
 Key: FLINK-10642
 URL: https://issues.apache.org/jira/browse/FLINK-10642
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.1
Reporter: xueyu
Assignee: xueyu


Several tests error in special config when setting maxGeneratedCodeLength 1. 
e.g.
  CalcITCase.testFilterOnCustomType:260 ? InvalidProgram Table program cannot 
be...
  JavaTableEnvironmentITCase.testAsFromAndToPojo:394 ? InvalidProgram Table 
prog...
  JavaTableEnvironmentITCase.testAsFromAndToPrivateFieldPojo:421 ? 
InvalidProgram
  JavaTableEnvironmentITCase.testAsFromPojo:288 ? InvalidProgram Table program 
c...
  JavaTableEnvironmentITCase.testAsFromPrivateFieldsPojo:366 ? InvalidProgram 
Ta...
  JavaTableEnvironmentITCase.testAsWithPojoAndGenericTypes:453 ? InvalidProgram 
...
  TimeAttributesITCase.testPojoSupport:566 ? JobExecution Job execution failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10641) Slow when applying new containers

2018-10-22 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10641:
--

 Summary: Slow when applying new containers
 Key: FLINK-10641
 URL: https://issues.apache.org/jira/browse/FLINK-10641
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.1
Reporter: Jiayi Liao
Assignee: Jiayi Liao


When requesting containers from yarn, the containers are received and returned 
over and over again like this:
14:36:19,486 INFO org.apache.flink.yarn.YarnResourceManager - Received new 
container: container_1535124617388_1936_01_000929 - Remaining pending container 
requests: 0
14:36:19,486 INFO org.apache.flink.yarn.YarnResourceManager - Returning excess 
container container_1535124617388_1936_01_000929.

Sometimes it will last several minutes, which is out of our expectations. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


回复:Re: Enable Slot Resource Profile for Resource Management

2018-10-22 Thread 宋辛童(五藏)
Thanks, Tison~

Thank You,
Tony Xintong 
Song--
发件人:Tzu-Li Chen
日 期:2018年10月22日 18:45:36
收件人:; 宋辛童(五藏)
主 题:Re: Enable Slot Resource Profile for Resource Management

Hi Tony,

I see the corresponding JIRA[1] and it looks like you don't attach it an mail 
list. I would do it for you and wonder if you ask for contributor bit to assign 
the JIRA to yourself.

For the topic I give some comments on the JIRA and briefly, I am open if we can 
take advantage of current somehow ignored ResourceProfile APIs.

Anyway, welcome on board!

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-10640

宋辛童(五藏)  于2018年10月22日周一 下午6:39写道:

Hi all,

We are planing to do some works related to Flink’s resource management. 
Precisely, we are trying to enable ResourceProfile-based resource management. 
Here is a brief description of our key ideas. Please let me know how you think 
about this.

Thank You,
Tony Xintong Song 


Re: Enable Slot Resource Profile for Resource Management

2018-10-22 Thread Tzu-Li Chen
Hi Tony,

I see the corresponding JIRA[1] and it looks like you don't attach it an
mail list. I would do it for you and wonder if you ask for contributor bit
to assign the JIRA to yourself.

For the topic I give some comments on the JIRA and briefly, I am open if we
can take advantage of current somehow ignored ResourceProfile APIs.

Anyway, welcome on board!

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-10640


宋辛童(五藏)  于2018年10月22日周一 下午6:39写道:

> Hi all,
>
> We are planing to do some works related to
> Flink’s resource management. Precisely, we are trying to enable 
> ResourceProfile-based resource management.
> Here is a brief description of our key ideas. Please let me know how you
> think about this.
>
> Thank You,
> Tony Xintong Song
>


Enable Slot Resource Profile for Resource Management

2018-10-22 Thread 宋辛童(五藏)
Hi all,

We are planing to do some works related to Flink’s resource management. 
Precisely, we are trying to enable ResourceProfile-based resource management. 
Here is a brief description of our key ideas. Please let me know how you think 
about this.

Thank You,
Tony Xintong Song

[jira] [Created] (FLINK-10640) Enable Slot Resource Profile for Resource Management

2018-10-22 Thread Xintong Song (JIRA)
Xintong Song created FLINK-10640:


 Summary: Enable Slot Resource Profile for Resource Management
 Key: FLINK-10640
 URL: https://issues.apache.org/jira/browse/FLINK-10640
 Project: Flink
  Issue Type: New Feature
  Components: ResourceManager
Reporter: Xintong Song


Motivation & Backgrounds
 * The existing concept of task slots roughly represents how many pipeline of 
tasks a TaskManager can hold. However, it does not consider the differences in 
resource needs and usage of individual tasks. Enabling resource profiles of 
slots may allow Flink to better allocate execution resources according to tasks 
fine-grained resource needs.
 * The community version Flink already contains APIs and some implementation 
for slot resource profile. However, such logic is not truly used. 
(ResourceProfile of slot requests is by default set to UNKNOWN with negative 
values, thus matches any given slot.)

Preliminary Design
 * Slot Management
 A slot represents a certain amount of resources for a single pipeline of tasks 
to run in on a TaskManager. Initially, a TaskManager does not have any slots 
but a total amount of resources. When allocating, the ResourceManager finds 
proper TMs to generate new slots for the tasks to run according to the slot 
requests. Once generated, the slot's size (resource profile) does not change 
until it's freed. ResourceManager can apply different, portable strategies to 
allocate slots from TaskManagers.
 * TM Management
 The size and number of TaskManagers and when to start them can also be 
flexible. TMs can be started and released dynamically, and may have different 
sizes. We may have many different, portable strategies. E.g., an elastic 
session that can run multiple jobs like the session mode while dynamically 
adjusting the size of session (number of TMs) according to the realtime working 
load.
 * About Slot Sharing
 Slot sharing is a good heuristic to easily calculate how many slots needed to 
get the job running and get better utilization when there is no resource 
profile in slots. However, with resource profiles enabling finer-grained 
resource management, each individual task has its specific resource need and it 
does not make much sense to have multiple tasks sharing the resource of the 
same slot. Instead, we may introduce locality preferences/constraints to 
support the semantics of putting tasks in same/different TMs in a more general 
way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Improve broadcast serialization

2018-10-22 Thread Piotr Nowojski
Sounds good to me :)

Piotrek

> On 19 Oct 2018, at 08:34, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> I agree with the additional thoughts of a), b) and c).
> 
> In all the current implementations of ChannelSelector, the selector channels 
> are either one or all, so it makes sense for change the interface as you 
> suggested if we will not extend other selectors for partial channels in 
> future. And the single channel implementation would reduce some overheads in 
> arrays and loop. For broadcast selector, it is no need to retrun channels 
> from selector and we can make a shortcut process for this special 
> implementation.
> 
> Comparing 3 vs 5, I still prefer 3 currently which can reuse the current 
> network process. We only create one BufferBuilder for al thel channels and 
> build separate BufferConsumer for every channel sharing the same 
> BufferBuilder. To do so, we just need a few changes on RecordWriter side, do 
> not touch the following components in network stack. And it will already gain 
> most of the performance benefits by doing so, which copies serialization 
> temporary buffer only once to one BufferBuilder.
> 
> I can first create the JIRA for single channel interface if you have not done 
> that before, and then continue with copying step by step. :)
> 
> Best,
> Zhijiang
> --
> 发件人:Piotr Nowojski 
> 发送时间:2018年10月18日(星期四) 17:47
> 收件人:Zhijiang(wangzhijiang999) 
> 抄 送:Nico Kruber ; dev 
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hey,
> 
> I also think that 3rd option is the most promising, however logic of “dirty” 
> channels might be causing some overheads. I was also thinking about other 
> option:
> 
> 5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to 
> common/shared BufferBuilder, but somehow marked it as targeted to only one 
> channel - we would send it over the network to all of the receivers, but all 
> except of one would ignore it. This might be easier to implement in 
> BroadcastRecordWriter, but would require extra logic on the receiver side. 
> With respect to the performance it also might be better compared to 3.
> 
> Couple of more thoughts:
> 
> a) if we select BroadcastRecordWriter, literally the only way how it can be 
> polluted by non broadcast writes are latency markers via `randomEmit`. When 
> choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so 
> we shouldn’t optimise for it, but pick something that’s easiest to implement.
> b) there are no use cases where `ChannelSelector` returns anything else 
> besides single channel or broadcast.
> 
> b) point brings me to one more thing. I was once playing with simplifying 
> `ChannelSelector` interface by adding new one `SingleChannelSelector` with 
> method:
> 
> `int selectChannel(T record, int numChannels);`
> 
> And it was resulting with ~10% performance speed up for network stack alone 
> (overhead of creating singleton arrays and iterating over them). I didn’t 
> follow up on this, because performance gain wasn’t super huge, while it 
> complicated `RecordWriter`, since it had to handle both either 
> `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there 
> are no use cases for selecting more then one, but not all of the channels and 
> that anyway we go with broadcasting, we will have to special handle 
> `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the 
> implementation and drop this multi channel ChannelSelector.
> 
> I think we should to this as a first step in a preparation before either 3. 
> or 5. (changing ChannelSelector signature to:
> 
> int selectChannel(T record, int numChannels);
> 
> )
> 
> What do you think?
> 
> Piotrek
> 
> On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) 
>  wrote:
> Hi Piotr,
> 
> Thanks for your replies and suggestions!
> 
> For my rough idea of skip index list, I agree with your concerns of 
> performance for non-broadcast case and complicated implementation. Although I 
> think this idea seems more unified in semantics for "emit", "broadcastEmit" 
> and "randomEmit" APIs, maybe it is not worth going deep into it currently for 
> global changes.
> 
> Currently RecordWriter provides three main methods to write elements in 
> different semantics:
> 
> "broadcastEmit" would write the element to all the channels, used for 
> watermark currently.
> "randomEmit" would write the element to one random channel, used for latency 
> marker currently.
> "emit" would write the element to some channels via ChannelSelector, used for 
> normal records currectly. And the selected channels may be one, some or all.
> 
> If we want to retain these APIs for different requirements, then the 
> RecordWriter should not be aware of which kind of elements would be written 
> via APIs, so we should not make any assumings in the implementation. In 
> details, I know the "randomEmit" in only used for latency marker currently, 

[jira] [Created] (FLINK-10639) Fix java syntax error in document

2018-10-22 Thread sunjincheng (JIRA)
sunjincheng created FLINK-10639:
---

 Summary: Fix java syntax error in document
 Key: FLINK-10639
 URL: https://issues.apache.org/jira/browse/FLINK-10639
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table API  SQL
Affects Versions: 1.6.1, 1.7.0
Reporter: sunjincheng
Assignee: Hequn Cheng
 Fix For: 1.7.0, 1.6.1
 Attachments: image-2018-10-22-16-54-10-305.png

Due to the  
[StreamTableSourceFactory|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/StreamTableSourceFactory.scala]
  is a trait. So the java example in the document should using "implements" 
keyword. 

!image-2018-10-22-16-54-10-305.png!

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10638) Invalid table scan resolution for temporal join queries

2018-10-22 Thread Timo Walther (JIRA)
Timo Walther created FLINK-10638:


 Summary: Invalid table scan resolution for temporal join queries
 Key: FLINK-10638
 URL: https://issues.apache.org/jira/browse/FLINK-10638
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Registered tables that contain a temporal join are not properly resolved when 
performing a table scan.

{code}
LogicalProject(amount=[*($0, $4)])
  LogicalFilter(condition=[=($3, $1)])
LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
  LogicalTableScan(table=[[_DataStreamTable_0]])
  
LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) NOT 
NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME 
ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;])
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10637) Start MiniCluster with random REST port

2018-10-22 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10637:
-

 Summary: Start MiniCluster with random REST port
 Key: FLINK-10637
 URL: https://issues.apache.org/jira/browse/FLINK-10637
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.6.1, 1.5.4, 1.7.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.6, 1.6.3, 1.7.0


The {{MiniCluster}} picks a random port for the {{RpcService}} but not for the 
REST server endpoint. Due to this it falls back to {{8081}}. This can lead to 
port conflicts if tests are executed concurrently.

I propose to rename the {{MiniClusterResource}} into 
{{MiniClusterResourceWithRestClient}} and add a new {{MiniClusterResource}} 
which only starts a {{MiniCluster}} with the REST port set to 0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)