Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-09-01 Thread Zhu Zhu
Hi Elkhan,

>>Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."
>>We are intending to use Flink Real-time pipeline for Replay from
Hive/HDFS (from offline source), to have 1 single pipeline for both batch
and real-time. So for batch Flink job, the ?>>containers will be released
once the job is done.
>>I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

This optimization is conducted by making flink dist jar a public
distributed cache of YARN.
In this way, the localized dist jar can be shared by different YARN
applications and it will not be removed when the YARN application which
localized it terminates.
This requires some changes in Flink though.
We will open a ISSUE to contribute this optimization to the community.

Thanks,
Zhu Zhu

SHI Xiaogang  于2019年8月31日周六 下午12:57写道:

> Hi Dadashov,
>
> You may have a look at method YarnResourceManager#onContainersAllocated
> which will launch containers (via NMClient#startContainer) after containers
> are allocated.
> The launching is performed in the main thread of YarnResourceManager and
> the launching is synchronous/blocking. Consequently, the containers will be
> launched one by one.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:
>
>> Thanks  everyone for valuable input and sharing  your experience for
>> tackling the issue.
>>
>> Regarding suggestions :
>> - We provision some common jars in all cluster nodes  *-->*  but this
>> requires dependence on Infra Team schedule for handling common jars/updating
>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>> size),  did not improve much. Only 100 containers could started in time.
>> but then receiving :
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>> start container.
>> This token is expired. current time is 1566422713305 found 1566422560552
>> Note: System times on machines may be out of sync. Check system time and 
>> time zones.
>>
>>
>> - It would be nice to see FLINK-13184
>>  , but expected
>> version that will get in is 1.10
>> - Increase replication factor --> It would be nice to have Flink conf for
>> setting replication factor for only Fink job jars, but not the output. It
>> is also challenging to set a replication for yet non-existing directory,
>> the new files will have default replication factor. Will explore HDFS cache
>> option.
>>
>> Maybe another option can be:
>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>> jars from already started TaskManagers  in P2P fashion, not to have a
>> blocker on HDFS replication.
>>
>> Spark job without any tuning exact same size jar with 800 executors, can
>> start without any issue at the same cluster in less than a minute.
>>
>> *Further questions:*
>>
>> *@ SHI Xiaogang > :*
>>
>> I see that all 800 requests are sent concurrently :
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources . Number pending requests
>> 793.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources . Number pending requests
>> 794.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
>> ...
>>
>> Can you please elaborate the part  "As containers are launched and
>> stopped one after another" ? Any pointer to class/method in Flink?
>>
>> *@ Zhu Zhu > *:
>>
>> Regarding "One optimization that we take is letting yarn to reuse the
>> flink-dist jar which was localized when running previous jobs."
>>
>> We are intending to use Flink Real-time pipeline for Replay from
>> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
>> and real-time. So for batch Flink job, the containers will be released once
>> the job is done.
>> I guess your job is real-time flink, so  you can share the  jars from
>> already long-running jobs.
>>
>> Thanks.
>>
>>
>> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:
>>
>>> I can think of 2 approaches:
>>>
>>> 1. Allow fli

Re: [ANNOUNCE] Kinesis connector becomes part of Flink releases

2019-09-01 Thread Yu Li
Great to know, thanks for the efforts Bowen!

And I believe it worth a release note in the original JIRA, wdyt? Thanks.

Best Regards,
Yu


On Sat, 31 Aug 2019 at 11:01, Bowen Li  wrote:

> Hi all,
>
> I'm glad to announce that, as #9494
> was merged today,
> flink-connector-kinesis is officially of Apache 2.0 license now in master
> branch and its artifact will be deployed to Maven central as part of Flink
> releases starting from Flink 1.10.0. Users can use the artifact out of
> shelf then and no longer have to build and maintain it on their own.
>
> It brings a much better user experience to our large AWS customer base by
> making their work simpler, smoother, and more productive!
>
> Thanks everyone who participated in coding and review to drive this
> initiative forward.
>
> Cheers,
> Bowen
>


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-01 Thread Yu Li
-1 on increasing the default delay to none zero, with below reasons:

a) I could see some concerns about setting the delay to zero in the very
original JIRA (FLINK-2993 )
but later on in FLINK-9158
 we still decided to make
the change, so I'm wondering whether the decision also came from any
customer requirement? If so, how could we judge whether one requirement
override the other?

b) There could be valid reasons for both default values depending on
different use cases, as well as relative work around (like based on latest
policy, setting the config manually to 10s could resolve the problem
mentioned), and from former replies to this thread we could see users have
already taken actions. Changing it back to non-zero again won't affect such
users but might cause surprises to those depending on 0 as default.

Last but not least, no matter what decision we make this time, I'd suggest
to make it final and document in our release note explicitly. Checking the
1.5.0 release note [1] [2] it seems we didn't mention about the change on
default restart delay and we'd better learn from it this time. Thanks.

[1]
https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html

Best Regards,
Yu


On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:

> +1 on what Zhu Zhu said.
>
> We also override the default to 10 s.
>
> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>
>> In our production, we usually override the restart delay to be 10 s.
>> We once encountered cases that external services are overwhelmed by
>> reconnections from frequent restarted tasks.
>> As a safer though not optimized option, a default delay larger than 0 s
>> is better in my opinion.
>>
>>
>> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>>
>>> Hi,
>>>
>>>
>>> I thinks it's better to increase the default value. +1
>>>
>>>
>>> Best.
>>>
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> 发件人: "Till Rohrmann";
>>> 发送时间: 2019年8月30日(星期五) 晚上10:07
>>> 收件人: "dev"; "user";
>>> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>>>
>>>
>>>
>>> Hi everyone,
>>>
>>> I wanted to reach out to you and ask whether decreasing the default delay
>>> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
>>> user reported that he would like to increase the default value because it
>>> can cause restart storms in case of systematic faults [2].
>>>
>>> The downside of increasing the default delay would be a slightly
>>> increased
>>> restart time if this config option is not explicitly set.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9158
>>> [2] https://issues.apache.org/jira/browse/FLINK-11218
>>>
>>> Cheers,
>>> Till
>>
>>


Re: Non incremental window function accumulates unbounded state with RocksDb

2019-09-01 Thread Yun Tang
Hi William

I think there might be another possible cause. Since RocksDB would perform 10X 
less than heap state backend. Have you ever checked current watermark of the 
job (from web UI) to see whether window triggered as expected, and whether the 
rocksDB job behaves back pressured? If state have been stayed in the window but 
not triggered, we might meet larger state. (However, it seems still cannot be 
acted a 400 factor larger)

Best
Yun Tang

From: William Jonsson 
Sent: Friday, August 30, 2019 18:22
To: Yun Tang ; user@flink.apache.org 
Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb


Thanks for your answer Yun.



I agree, I don’t believe that either, however that’s my empirical observation. 
Those statistics are from save points. Basically the jobs are running towards a 
production kafka so no, not exactly the same input. However, these statistics 
are from several runs distributed in time so they should not contain temporal 
effects. There are no failovers in the pipeline during runtime. By doing some 
calculations on the size and the pace of the data in the pipeline (how often we 
receive data and how big the datatype is) yields that the buffered data in the 
windows should be around a little less than 200Mb, so the HeapBackend behaves 
accordingly. I agree, the space amplification can’t be a factor of 400 and 
still continue growing for the RocksDb. I’ve spent some time trying to figure 
this out, if we are doing anything obscure , but I cant find anything. So it 
would be interesting if anyone have the same experience as I have.



The pipeline is currently running on Flink 1.7.2



Best regards and wish you a pleasant day,

William



From: Yun Tang 
Date: Friday, 30 August 2019 at 11:42
To: William Jonsson , "user@flink.apache.org" 

Cc: Fleet Perception for Maintenance 

Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb



Hi William



I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?



Best

Yun Tang



From: William Jonsson 
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org 
Cc: Fleet Perception for Maintenance 

Subject: Non incremental window function accumulates unbounded state with 
RocksDb



Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

 //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 
Collector[Histogr