Re: [DISCUSS] Detection Flink Backpressure

2019-01-04 Thread Piotr Nowojski
Hi,

In that case I think instead of fixing the current back pressure monitoring 
mechanism, it would be better to replace it with a new one based on output 
queues length. But I haven’t thought it through, especially with respect to 
performance implications, however my gut feeling is that it should be solvable 
in one way or another.

Piotrek

> On 3 Jan 2019, at 20:05, Ken Krugler  wrote:
> 
> There’s the related issue of Async I/O not showing up in back pressure 
> reporting, also due to the same issue of threads not being sampled.
> 
> — Ken
> 
>> On Jan 3, 2019, at 10:25 AM, Jamie Grier  wrote:
>> 
>> One unfortunate problem with the current back-pressure detection mechanism
>> is that it doesn't work well with all of our sources.  The problem is that
>> some sources (Kinesis for sure) emit elements from threads Flink knows
>> nothing about and therefore those stack traces aren't sampled.  The result
>> is that you never see back-pressure detected in the first chain of a Flink
>> job containing that source.
>> 
>> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski  wrote:
>> 
>>> Hi all,
>>> 
>>> peiliping: I think your idea could be problematic for couple of reasons.
>>> Probably minor concern is that checkpoint time could be affected not only
>>> because of the back pressure, but also because how long does it take to
>>> actually perform the checkpoint. Bigger issues are that this bottleneck
>>> detection would be limited to only during checkpointing (what if one has
>>> checkpoints only once every 1 hour? Or none at all?) AND
>>> performance/bottlenecks may change significantly during checkpointing (for
>>> example writing state for the first operator to DFS can affect indirectly
>>> down stream operators).
>>> 
>>> The idea of detecting back pressure/bottlenecks using output/input buffers
>>> is much more natural. Because in the end, almost by definition, if the
>>> output buffers are full, that means that the given task is back pressured.
>>> 
>>> Both input and output queues length are already exposed via metrics, so
>>> developers have an access to raw data to manually calculate/detect
>>> bottlenecks. It would be actually nice to automatically aggregate those
>>> metrics and provide ready to use metrics: boolean flags whether
>>> task/stage/job are back pressured or not.
>>> 
>>> Replacing current back pressure detection mechanism that probes the
>>> threads and checks which of them are waiting for buffers is another issues.
>>> Functionally it is equivalent to monitoring whether the output queues are
>>> full. This might be more hacky, but will give the same results, thus it
>>> wasn’t high on my priority list to change/refactor. It would be nice to
>>> clean this up a little bit and unify, but using metrics can also mean some
>>> additional work, since there are some known metrics related performance
>>> issues.
>>> 
>>> Piotrek
>>> 
 On 3 Jan 2019, at 10:35, peiliping  wrote:
 
 I have some ideas about detecting the backpressure (the blocking
>>> operators)  by checkpoint barrier .
 
 I have some flink-jobs with checkpoint , but their checkpoints will take
>>> a long time to be completed .
 
 I need to find out the blocking operators  , the same as the
>>> backpressure detection .
 
 In a checkpoint object , I can get a timestamp which means the
>>> start-time , then I compute a metric in
 
 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
 
 The metric  is  a delta time between checkpoint.timestamp to the time
>>> when StreamTask.executeCheckpointing invoke
 
 and I named it as checkpoint-delay-time .
 
 It looks like the end-to-end-time metric in checkpoint  but not include
>>> async-handles  ,
 
 For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>>> ---> C (parallelism : 1)
 
 Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>>> A(there are 2 instances )
 
 Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>>> B(there are 3 instances )
 
 Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>>> C(there is 1 instance)
 
 Then I can get the other 3 delta time from checkpoint-delay-values
 
 result-0-->A  = Checkpoint-delay-value-A  -  0
 
 result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
 
 result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
 
 someone ( result-X-->Y)  which is longer than 5s (maybe other
>>> threshold)  should be the black sheep .
 
 
 
 
 
 在 2019/1/3 下午2:43, Yun Gao :
> Hello liping,
> 
>  Thank you for proposing to optimize the backpressure detection!
>>> From our previous experience, we think the InputBufferPoolUsageGauge and
>>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>>> list of tasks A ---> B > C, if we found t

Re: [DISCUSSION] Complete restart after successive failures

2019-01-04 Thread Piotr Nowojski
That’s a good point Till. Blacklisting TMs could be able to handle this. One 
scenario that might be problematic is if clean restart is needed after a more 
or less random number of job resubmissions, like if resource leakage has 
different rates on different nodes. In such situation, if we blacklist and 
restart TMs one by one, Job can keep failing constantly with failures caused 
every time by a different TM. It could end up with a dead loop in some 
scenarios/setups. Where the Gyula’s proposal would restart all of the TMs at 
once, reseting the leakage on all of the TMs at the same time, making a 
successful restart possible.

I still think that blacklisting TMs is a better way to do it, but maybe we 
still need some kind of limit, like after N blacklists restart all TMs. But 
this would also add an additional complexity.

Piotrek

> On 3 Jan 2019, at 13:59, Till Rohrmann  wrote:
> 
> Hi Gyula,
> 
> I see the benefit of having such an option. In fact, it goes in a similar
> direction as the currently ongoing discussion about blacklisting TMs. In
> the end it could work by reporting failures to the RM which aggregates some
> statistics for the individual TMs. Based on some thresholds it could then
> decide to free/blacklist a specific TM. Whether to blacklist or restart a
> container could then be a configurable option.
> 
> Cheers,
> Till
> 
> On Wed, Jan 2, 2019 at 1:15 PM Piotr Nowojski  wrote:
> 
>> Hi Gyula,
>> 
>> Personally I do not see a problem with providing such an option of “clean
>> restart” after N failures, especially if we set the default value for N to
>> +infinity. However guys working more with Flink’s scheduling systems might
>> have more to say about this.
>> 
>> Piotrek
>> 
>>> On 29 Dec 2018, at 13:36, Gyula Fóra  wrote:
>>> 
>>> Hi all!
>>> 
>>> In the past years while running Flink in production we have seen a huge
>>> number of scenarios when the Flink jobs can go into unrecoverable failure
>>> loops and only a complete manual restart helps.
>>> 
>>> This is in most cases due to memory leaks in the user program, leaking
>>> threads etc and it leads to a failure loop due to the fact that the job
>> is
>>> restarted within the same JVM (Taskmanager). After the restart the leak
>>> gets worse and worse eventually crashing the TMs one after the other and
>>> never recovering.
>>> 
>>> These issues are extremely hard to debug (might only cause problems
>> after a
>>> few failures) and can cause long lasting instabilities.
>>> 
>>> I suggest we enable an option that would trigger a complete restart every
>>> so many failures. This would release all containers (TM and JM) and
>> restart
>>> everything.
>>> 
>>> The only argument against this I see is that this might further hide the
>>> root cause of the problem on the job/user side. While this is true a
>> stuck
>>> production job with crashing TM is probably much worse out of these 2.
>>> 
>>> What do you think?
>>> 
>>> Gyula
>> 
>> 



Re: [DISCUSSION] Complete restart after successive failures

2019-01-04 Thread Gyula Fóra
Could it also work so that after so many tries it blacklists everything?
That way it would pretty much trigger a fresh restart.

Gyula

On Fri, 4 Jan 2019 at 10:11, Piotr Nowojski  wrote:

> That’s a good point Till. Blacklisting TMs could be able to handle this.
> One scenario that might be problematic is if clean restart is needed after
> a more or less random number of job resubmissions, like if resource leakage
> has different rates on different nodes. In such situation, if we blacklist
> and restart TMs one by one, Job can keep failing constantly with failures
> caused every time by a different TM. It could end up with a dead loop in
> some scenarios/setups. Where the Gyula’s proposal would restart all of the
> TMs at once, reseting the leakage on all of the TMs at the same time,
> making a successful restart possible.
>
> I still think that blacklisting TMs is a better way to do it, but maybe we
> still need some kind of limit, like after N blacklists restart all TMs. But
> this would also add an additional complexity.
>
> Piotrek
>
> > On 3 Jan 2019, at 13:59, Till Rohrmann  wrote:
> >
> > Hi Gyula,
> >
> > I see the benefit of having such an option. In fact, it goes in a similar
> > direction as the currently ongoing discussion about blacklisting TMs. In
> > the end it could work by reporting failures to the RM which aggregates
> some
> > statistics for the individual TMs. Based on some thresholds it could then
> > decide to free/blacklist a specific TM. Whether to blacklist or restart a
> > container could then be a configurable option.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jan 2, 2019 at 1:15 PM Piotr Nowojski 
> wrote:
> >
> >> Hi Gyula,
> >>
> >> Personally I do not see a problem with providing such an option of
> “clean
> >> restart” after N failures, especially if we set the default value for N
> to
> >> +infinity. However guys working more with Flink’s scheduling systems
> might
> >> have more to say about this.
> >>
> >> Piotrek
> >>
> >>> On 29 Dec 2018, at 13:36, Gyula Fóra  wrote:
> >>>
> >>> Hi all!
> >>>
> >>> In the past years while running Flink in production we have seen a huge
> >>> number of scenarios when the Flink jobs can go into unrecoverable
> failure
> >>> loops and only a complete manual restart helps.
> >>>
> >>> This is in most cases due to memory leaks in the user program, leaking
> >>> threads etc and it leads to a failure loop due to the fact that the job
> >> is
> >>> restarted within the same JVM (Taskmanager). After the restart the leak
> >>> gets worse and worse eventually crashing the TMs one after the other
> and
> >>> never recovering.
> >>>
> >>> These issues are extremely hard to debug (might only cause problems
> >> after a
> >>> few failures) and can cause long lasting instabilities.
> >>>
> >>> I suggest we enable an option that would trigger a complete restart
> every
> >>> so many failures. This would release all containers (TM and JM) and
> >> restart
> >>> everything.
> >>>
> >>> The only argument against this I see is that this might further hide
> the
> >>> root cause of the problem on the job/user side. While this is true a
> >> stuck
> >>> production job with crashing TM is probably much worse out of these 2.
> >>>
> >>> What do you think?
> >>>
> >>> Gyula
> >>
> >>
>
>


[jira] [Created] (FLINK-11265) Invalid reference to AvroSinkWriter in example AvroKeyValueSinkWriter

2019-01-04 Thread Fokko Driesprong (JIRA)
Fokko Driesprong created FLINK-11265:


 Summary: Invalid reference to AvroSinkWriter in example 
AvroKeyValueSinkWriter
 Key: FLINK-11265
 URL: https://issues.apache.org/jira/browse/FLINK-11265
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.1
Reporter: Fokko Driesprong
Assignee: Fokko Driesprong
 Fix For: 1.7.2






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11267) Update create_binary_release.sh to not create hadoop-specific releases

2019-01-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11267:


 Summary: Update create_binary_release.sh to not create 
hadoop-specific releases
 Key: FLINK-11267
 URL: https://issues.apache.org/jira/browse/FLINK-11267
 Project: Flink
  Issue Type: Sub-task
  Components: Release System
Affects Versions: 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.8.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11268) Update deploy_staging_jars.sh to deploy multiple flink-shaded-hadoop2 versions

2019-01-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11268:


 Summary: Update deploy_staging_jars.sh to deploy multiple 
flink-shaded-hadoop2 versions
 Key: FLINK-11268
 URL: https://issues.apache.org/jira/browse/FLINK-11268
 Project: Flink
  Issue Type: Sub-task
  Components: Release System
Affects Versions: 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.8.0


We will have to make a few changes to the {{fink-shaded-hadoop2}} module to 
support deployment of multiple versions. Currently neither the module name nor 
version contains the contained hadoop version; as such maven cannot 
differentiate between 2 versions of this artifact that were built with 
different a {{hadoop.version}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11266) Only release hadoop-free Flink

2019-01-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11266:


 Summary: Only release hadoop-free Flink
 Key: FLINK-11266
 URL: https://issues.apache.org/jira/browse/FLINK-11266
 Project: Flink
  Issue Type: Improvement
  Components: Release System
Affects Versions: 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.8.0


Currently we release 10 different binary releases (2 scala versions * (4 hadoop 
version + hadoop-free)), which has increased the size of our release to more 
than 2 GB.
Naturally, building Flink 10 times also takes a while, slowing down the release 
process.

However, the only difference between the hadoop versions is the bundled 
{{flink-shaded-hadoop2}} jar; the rest is completely identical.

I propose to stop releasing hadoop-specific distributions, and instead have us 
release multiple versions of {{flink-shaded-hadoop2}} that users copy into the 
hadoop-free distribution if required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11269) Extend Download page to list optional components

2019-01-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11269:


 Summary: Extend Download page to list optional components
 Key: FLINK-11269
 URL: https://issues.apache.org/jira/browse/FLINK-11269
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


Extend the download page of the Flink Website to support listing additional 
optional jars to be added to a Flink distribution.

These optional jars may include (among others):
* flink-shaded-hadoop2 (see FLINK-11266)
* SQL format/connector jars
* metric reporters

Overall this will allow us to slim down flink-dist and make it more convenient 
for users to download these jars.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11270) Do not include hadoop in flink-dist by default

2019-01-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11270:


 Summary: Do not include hadoop in flink-dist by default
 Key: FLINK-11270
 URL: https://issues.apache.org/jira/browse/FLINK-11270
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.8.0


In order to build a hadoop-free Flink it is currently necessary to activate the 
{{without-hadoop}} profile.
We should revert this so that flink-dist is hadoop-free by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11271) Improve Kerberos Credential Distribution

2019-01-04 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11271:
-

 Summary: Improve Kerberos Credential Distribution
 Key: FLINK-11271
 URL: https://issues.apache.org/jira/browse/FLINK-11271
 Project: Flink
  Issue Type: Improvement
  Components: Security, YARN
Reporter: Rong Rong
Assignee: Rong Rong


This is the master JIRA for the improvement listed in:

https://docs.google.com/document/d/1rBLCpyQKg6Ld2P0DEgv4VIOMTwv4sitd7h7P5r202IE/edit#heading=h.y34f96ctioqk





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)