Re: Adaptive load balancing

2020-09-23 Thread Zhijiang
Hi Krishnan,

Thanks for discussing this interesting scenario! 

It makes me remind of a previous pending improvement of adaptive load balance 
for rebalance partitioner. 
Since the rebalance mode can emit the data to any nodes without precision 
consideration, then the data can be emitted based on the current backlog of 
partition adaptively which can reflect the load condition of consumers somehow.

For your keyBy case, I guess the requirement is not only for the load balance 
of processing, but also for the consistency of preloaded cache.
Do you think it is possible to implement somehow custom partitioner which can 
control the logic of keyBy distribution based on pre-defined cache distribution 
in nodes? 

Best,
Zhijiang
--
From:Navneeth Krishnan 
Send Time:2020年9月23日(星期三) 02:21
To:user 
Subject:Adaptive load balancing

Hi All,

We are currently using flink in production and use keyBy for performing a CPU 
intensive computation. There is a cache lookup for a set of keys and since 
keyBy cannot guarantee the data is sent to a single node we are basically 
replicating the cache on all nodes. This is causing more memory problems for us 
and we would like to explore some options to mitigate the current limitations.

Is there a way to group a set of keys and send to a set of nodes so that we 
don't have to replicate the cache data on all nodes?

Has someone tried implementing hashing with adaptive load balancing so that if 
a node is busy processing then the data can be routed effectively to other 
nodes which are free.

Any suggestions are greatly appreciated.

Thanks



Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhijiang
Congrats, Dian!


--
From:Yun Gao 
Send Time:2020年8月27日(星期四) 17:44
To:dev ; Dian Fu ; user 
; user-zh 
Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

Congratulations Dian !

 Best
 Yun


--
Sender:Marta Paes Moreira
Date:2020/08/27 17:42:34
Recipient:Yuan Mei
Cc:Xingbo Huang; jincheng sun; 
dev; Dian Fu; 
user; user-zh
Theme:Re: [ANNOUNCE] New PMC member: Dian Fu

Congrats, Dian!
On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

Congrats!
On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

Congratulations Dian!

Best,
Xingbo
jincheng sun  于2020年8月27日周四 下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various important 
features, such as the Python UDF and Pandas integration, and keeps checking and 
voting for our releases, and also has successfully produced two 
releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)



Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhijiang
Congrats, Dian!


--
From:Yun Gao 
Send Time:2020年8月27日(星期四) 17:44
To:dev ; Dian Fu ; user 
; user-zh 
Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

Congratulations Dian !

 Best
 Yun


--
Sender:Marta Paes Moreira
Date:2020/08/27 17:42:34
Recipient:Yuan Mei
Cc:Xingbo Huang; jincheng sun; 
dev; Dian Fu; 
user; user-zh
Theme:Re: [ANNOUNCE] New PMC member: Dian Fu

Congrats, Dian!
On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

Congrats!
On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

Congratulations Dian!

Best,
Xingbo
jincheng sun  于2020年8月27日周四 下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various important 
features, such as the Python UDF and Pandas integration, and keeps checking and 
voting for our releases, and also has successfully produced two 
releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)



Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-27 Thread Zhijiang
Congrats, thanks for the release manager work Zhu Zhu and everyone involved in!

Best,
Zhijiang
--
From:liupengcheng 
Send Time:2020年8月26日(星期三) 19:37
To:dev ; Xingbo Huang 
Cc:Guowei Ma ; user-zh ; Yangze 
Guo ; Dian Fu ; Zhu Zhu 
; user 
Subject:Re: [ANNOUNCE] Apache Flink 1.10.2 released

Thanks ZhuZhu for managing this release and everyone who contributed to this.

Best,
Pengcheng

 在 2020/8/26 下午7:06,“Congxian Qiu” 写入:

Thanks ZhuZhu for managing this release and everyone else who contributed
to this release!

Best,
Congxian


Xingbo Huang  于2020年8月26日周三 下午1:53写道:

> Thanks Zhu for the great work and everyone who contributed to this 
release!
>
> Best,
> Xingbo
>
> Guowei Ma  于2020年8月26日周三 下午12:43写道:
>
>> Hi,
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Thanks everyone contributed to this!
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>>
>>> Thanks for Zhu's work to manage this release and everyone who
>>> contributed to this!
>>>
>>> Best,
>>> Yun Tang
>>> 
>>> From: Yangze Guo 
>>> Sent: Tuesday, August 25, 2020 14:47
>>> To: Dian Fu 
>>> Cc: Zhu Zhu ; dev ; user <
>>> user@flink.apache.org>; user-zh 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>>
>>> Thanks a lot for being the release manager Zhu Zhu!
>>> Congrats to all others who have contributed to the release!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>>> >
>>> > Thanks ZhuZhu for managing this release and everyone else who
>>> contributed to this release!
>>> >
>>> > Regards,
>>> > Dian
>>> >
>>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>>> >
>>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>>> all others who have contributed to the release!
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.2, which is the first bugfix release for the Apache 
Flink
>>> 1.10 series.
>>> >>
>>> >> Apache Flink(r) is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data 
streaming
>>> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu
>>> >
>>> >
>>>
>>



Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-27 Thread Zhijiang
Congrats, thanks for the release manager work Zhu Zhu and everyone involved in!

Best,
Zhijiang
--
From:liupengcheng 
Send Time:2020年8月26日(星期三) 19:37
To:dev ; Xingbo Huang 
Cc:Guowei Ma ; user-zh ; Yangze 
Guo ; Dian Fu ; Zhu Zhu 
; user 
Subject:Re: [ANNOUNCE] Apache Flink 1.10.2 released

Thanks ZhuZhu for managing this release and everyone who contributed to this.

Best,
Pengcheng

 在 2020/8/26 下午7:06,“Congxian Qiu” 写入:

Thanks ZhuZhu for managing this release and everyone else who contributed
to this release!

Best,
Congxian


Xingbo Huang  于2020年8月26日周三 下午1:53写道:

> Thanks Zhu for the great work and everyone who contributed to this 
release!
>
> Best,
> Xingbo
>
> Guowei Ma  于2020年8月26日周三 下午12:43写道:
>
>> Hi,
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Thanks everyone contributed to this!
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>>
>>> Thanks for Zhu's work to manage this release and everyone who
>>> contributed to this!
>>>
>>> Best,
>>> Yun Tang
>>> 
>>> From: Yangze Guo 
>>> Sent: Tuesday, August 25, 2020 14:47
>>> To: Dian Fu 
>>> Cc: Zhu Zhu ; dev ; user <
>>> u...@flink.apache.org>; user-zh 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>>
>>> Thanks a lot for being the release manager Zhu Zhu!
>>> Congrats to all others who have contributed to the release!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>>> >
>>> > Thanks ZhuZhu for managing this release and everyone else who
>>> contributed to this release!
>>> >
>>> > Regards,
>>> > Dian
>>> >
>>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>>> >
>>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>>> all others who have contributed to the release!
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.2, which is the first bugfix release for the Apache 
Flink
>>> 1.10 series.
>>> >>
>>> >> Apache Flink(r) is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data 
streaming
>>> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu
>>> >
>>> >
>>>
>>



Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Zhijiang
Thanks for being the release manager and the efficient work, Dian!

Best,
Zhijiang


--
From:Konstantin Knauf 
Send Time:2020年7月22日(星期三) 19:55
To:Till Rohrmann 
Cc:dev ; Yangze Guo ; Dian Fu 
; user ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released

Thank you for managing the quick follow up release. I think this was very 
important for Table & SQL users.
On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann  wrote:

Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a lot 
to everyone who contributed to this release.

Cheers,
Till
On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
Thanks Dian for the great work and thanks to everyone who makes this
 release possible!

 Best, Hequn

 On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:

 > Congratulations! Thanks Dian for the great work and to be the release
 > manager!
 >
 > Best,
 > Jark
 >
 > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
 >
 > > Congrats!
 > >
 > > Thanks Dian Fu for being release manager, and everyone involved!
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
 > wrote:
 > > >
 > > > Congratulations! Thanks Dian for the great work!
 > > >
 > > > Best,
 > > > Wei
 > > >
 > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
 > > > >
 > > > > Congratulations!
 > > > >
 > > > > Thanks Dian Fu for the great work as release manager, and thanks
 > > everyone involved!
 > > > >
 > > > > Best
 > > > > Leonard Xu
 > > > >
 > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
 > > > >>
 > > > >> The Apache Flink community is very happy to announce the release of
 > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
 > Flink
 > > 1.11 series.
 > > > >>
 > > > >> Apache Flink(r) is an open-source stream processing framework for
 > > distributed, high-performing, always-available, and accurate data
 > streaming
 > > applications.
 > > > >>
 > > > >> The release is available for download at:
 > > > >> https://flink.apache.org/downloads.html
 > > > >>
 > > > >> Please check out the release blog post for an overview of the
 > > improvements for this bugfix release:
 > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
 > > > >>
 > > > >> The full release notes are available in Jira:
 > > > >>
 > >
 > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323
 > > > >>
 > > > >> We would like to thank all contributors of the Apache Flink
 > community
 > > who made this release possible!
 > > > >>
 > > > >> Regards,
 > > > >> Dian
 > > > >
 > > >
 > >
 >


-- 
Konstantin Knauf 
https://twitter.com/snntrable
https://github.com/knaufk 



Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Zhijiang
Thanks for being the release manager and the efficient work, Dian!

Best,
Zhijiang


--
From:Konstantin Knauf 
Send Time:2020年7月22日(星期三) 19:55
To:Till Rohrmann 
Cc:dev ; Yangze Guo ; Dian Fu 
; user ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released

Thank you for managing the quick follow up release. I think this was very 
important for Table & SQL users.
On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann  wrote:

Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a lot 
to everyone who contributed to this release.

Cheers,
Till
On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
Thanks Dian for the great work and thanks to everyone who makes this
 release possible!

 Best, Hequn

 On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:

 > Congratulations! Thanks Dian for the great work and to be the release
 > manager!
 >
 > Best,
 > Jark
 >
 > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
 >
 > > Congrats!
 > >
 > > Thanks Dian Fu for being release manager, and everyone involved!
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
 > wrote:
 > > >
 > > > Congratulations! Thanks Dian for the great work!
 > > >
 > > > Best,
 > > > Wei
 > > >
 > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
 > > > >
 > > > > Congratulations!
 > > > >
 > > > > Thanks Dian Fu for the great work as release manager, and thanks
 > > everyone involved!
 > > > >
 > > > > Best
 > > > > Leonard Xu
 > > > >
 > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
 > > > >>
 > > > >> The Apache Flink community is very happy to announce the release of
 > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
 > Flink
 > > 1.11 series.
 > > > >>
 > > > >> Apache Flink(r) is an open-source stream processing framework for
 > > distributed, high-performing, always-available, and accurate data
 > streaming
 > > applications.
 > > > >>
 > > > >> The release is available for download at:
 > > > >> https://flink.apache.org/downloads.html
 > > > >>
 > > > >> Please check out the release blog post for an overview of the
 > > improvements for this bugfix release:
 > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
 > > > >>
 > > > >> The full release notes are available in Jira:
 > > > >>
 > >
 > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323
 > > > >>
 > > > >> We would like to thank all contributors of the Apache Flink
 > community
 > > who made this release possible!
 > > > >>
 > > > >> Regards,
 > > > >> Dian
 > > > >
 > > >
 > >
 >


-- 
Konstantin Knauf 
https://twitter.com/snntrable
https://github.com/knaufk 



[ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Zhijiang
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.11.0, which is the latest major release.

Apache Flink(r) is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2020/07/06/release-1.11.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Piotr & Zhijiang

Re: Unaligned Checkpoint and Exactly Once

2020-06-21 Thread Zhijiang
From implementation or logic complication perspective, the AT_LEAST_ONCE is 
somehow simpler compared with EXACTLY_ONCE w/o unaligned, since 
it can always process data without blocking any channels. 


--
From:Lu Weizheng 
Send Time:2020年6月22日(星期一) 10:53
To:Zhijiang ; user@flink.apache.org 

Subject:回复: Unaligned Checkpoint and Exactly Once

 Thank you Zhijiang! The second question about config is just because I find a 
method in InputProcessorUtil. I guess AT_LEAST_ONCE  mode is a simpler way to 
handle checkpont barrier?

private static CheckpointBarrierHandler createCheckpointBarrierHandler(
  StreamConfig config,
  InputGate[] inputGates,
  SubtaskCheckpointCoordinator checkpointCoordinator,
  String taskName,
  AbstractInvokable toNotifyOnCheckpoint) {
   switch (config.getCheckpointMode()) {
  case EXACTLY_ONCE:
 if (config.isUnalignedCheckpointsEnabled()) {
return new AlternatingCheckpointBarrierHandler(
   new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, 
inputGates),
   new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, 
toNotifyOnCheckpoint, inputGates),
   toNotifyOnCheckpoint);
 }
 return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, 
inputGates);
  case AT_LEAST_ONCE:
 int numInputChannels = 
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
 return new CheckpointBarrierTracker(numInputChannels, 
toNotifyOnCheckpoint);
  default:
 throw new UnsupportedOperationException("Unrecognized Checkpointing 
Mode: " + config.getCheckpointMode());
   }
}

发件人: Zhijiang 
发送时间: 2020年6月22日 10:41
收件人: Lu Weizheng ; user@flink.apache.org 

主题: Re: Unaligned Checkpoint and Exactly Once
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 
except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can 
satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first 
config is for describing whether the current setting mode (actually only 
exactly-once) enables UC or not, and the second config is for setting the 
different mode (exactly-once or at least-once). I guess you refer to merge them 
by using the first config form. But somehow they seem two different dimensions 
for config the checkpoint. One is for the semantic of data processing 
guarantee. And the other is for how we realize two different mechanisms to 
guarantee one (exactly-once) of the semantics. 


Best,
Zhijiang

--
From:Lu Weizheng 
Send Time:2020年6月22日(星期一) 07:20
To:user@flink.apache.org 
Subject:Unaligned Checkpoint and Exactly Once

 Hi there,

 The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which 
means a operator subtask does not need to wait all the Checkpoint barrier and 
will not block some channels. As the Checkpoint barrier is the key mechanism 
for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still 
achieve Exactly Once guarantee or only AT Least Once?

FLIP-76 :
Unaligned checkpoints will initially be an optional feature. After collecting 
experience and implementing all necessary extensions, unaligned checkpoint will 
probably be enabled by default for exactly once.

 What's more, in the following two configs,

 Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

 Config 2

checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

 Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

 Hope for replies!

 Weizheng




Re: Unaligned Checkpoint and Exactly Once

2020-06-21 Thread Zhijiang
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 
except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can 
satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first 
config is for describing whether the current setting mode (actually only 
exactly-once) enables UC or not, and the second config is for setting the 
different mode (exactly-once or at least-once). I guess you refer to merge them 
by using the first config form. But somehow they seem two different dimensions 
for config the checkpoint. One is for the semantic of data processing 
guarantee. And the other is for how we realize two different mechanisms to 
guarantee one (exactly-once) of the semantics. 


Best,
Zhijiang


--
From:Lu Weizheng 
Send Time:2020年6月22日(星期一) 07:20
To:user@flink.apache.org 
Subject:Unaligned Checkpoint and Exactly Once

 Hi there,

 The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which 
means a operator subtask does not need to wait all the Checkpoint barrier and 
will not block some channels. As the Checkpoint barrier is the key mechanism 
for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still 
achieve Exactly Once guarantee or only AT Least Once?

FLIP-76 :
Unaligned checkpoints will initially be an optional feature. After collecting 
experience and implementing all necessary extensions, unaligned checkpoint will 
probably be enabled by default for exactly once.

 What's more, in the following two configs,

 Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

 Config 2

checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

 Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

 Hope for replies!

 Weizheng



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Zhijiang
Congratulations Yu! Well deserved!

Best,
Zhijiang


--
From:Dian Fu 
Send Time:2020年6月17日(星期三) 10:48
To:dev 
Cc:Haibo Sun ; user ; user-zh 

Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Zhijiang
Congratulations Yu! Well deserved!

Best,
Zhijiang


--
From:Dian Fu 
Send Time:2020年6月17日(星期三) 10:48
To:dev 
Cc:Haibo Sun ; user ; user-zh 

Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Re: Blocked requesting MemorySegment when Segments are available.

2020-06-10 Thread Zhijiang
Hi David,

I want to clarify two things firstly based on the info you provided below.

1. If all the tasks are running on the same TaskManager, it would be no 
credit-based flow control. The downstream operator consumes the upstream's data 
in memory directly, no need network shuffle.
2. If the TaskManager has available buffers, that does not mean the internal 
task must have available buffers on input or output sides. E.g for the output 
side of "enrich-events" operator, it has
10 buffers in maximum. After these buffers are exhausted the operator would be 
blocked no matter with available buffers on TaskManager level.

Considering your case, could you double check whether there are buffers 
accumulated in output ("outputQueueLength" metric) of "enrich-events" operator 
and whether the "numRecordsIn/numBytesIn" metric of "Test Function" operator is 
more than 0? I want to get ride of the factors of buffer leak on upstream side 
and without partition request on downstream side. Then we can further allocate 
whether
the input availability notification on downstream side has bugs to make it 
stuck forever.

Best,
Zhijiang


--
From:David Maddison 
Send Time:2020年6月9日(星期二) 19:28
To:user 
Subject:Blocked requesting MemorySegment when Segments are available.

Hi,

I keep seeing the following situation where a task is blocked getting a 
MemorySegment from the pool but the TaskManager is reporting that it has lots 
of MemorySegments available.  

I'm completely stumped as to how to debug or what to look at next so any 
hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever 
attempting to get a memory segment to send to downstream operator "Test 
function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0 
tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0xd2206000> (a 
java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)


All the operator tasks are running on the same TaskManager and the TaskManager 
reports that it has 6,517 memory segments available, so it's confusing why the 
task would be blocked getting a memory segment.

Memory Segments
Type  Count
Available  6,517
Total  6,553

Even more confusing is that the downstream task appears to be waiting for data 
and therefore I would assume that the credit based flow control isn't causing 
the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c 
waiting on condition [0x7f644abf]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0xc91de1d0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(Mailbox

Re: Flink 1.10 memory and backpressure

2020-06-10 Thread Zhijiang
Sorry for missing the document link [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/back_pressure.html


--
From:Zhijiang 
Send Time:2020年6月11日(星期四) 11:32
To:Steven Nelson ; user 
Subject:Re: Flink 1.10 memory and backpressure

Regarding the monitor of backpressure, you can refer to the document [1].

As for debugging the backpressure, one option is to trace the jstack of 
respective window task thread which causes the backpressure(almost has the 
maximum inqueue buffers).
After frequent tracing the jstack, you might find which execution (e.g. state 
access) costs much, then you can probably find the bottleneck.

Besides that, in release-1.11 the unaligned checkpoint is introduced and 
implemented to mainly resolve the checkpoint issue in the case of 
backkpressure. Maybe you can pay attention
to this feature and have a try for your case.

Best,
Zhiijiang


--
From:Steven Nelson 
Send Time:2020年6月11日(星期四) 04:35
To:user 
Subject:Flink 1.10 memory and backpressure

We are working with a process and having some problems with backpressure.

The backpressure seems to be caused by a simple Window operation, which causes 
our checkpoints to fail.

What would be the recommendations for debugging the backpressure?




Re: Flink 1.10 memory and backpressure

2020-06-10 Thread Zhijiang
Regarding the monitor of backpressure, you can refer to the document [1].

As for debugging the backpressure, one option is to trace the jstack of 
respective window task thread which causes the backpressure(almost has the 
maximum inqueue buffers).
After frequent tracing the jstack, you might find which execution (e.g. state 
access) costs much, then you can probably find the bottleneck.

Besides that, in release-1.11 the unaligned checkpoint is introduced and 
implemented to mainly resolve the checkpoint issue in the case of 
backkpressure. Maybe you can pay attention
to this feature and have a try for your case.

Best,
Zhiijiang


--
From:Steven Nelson 
Send Time:2020年6月11日(星期四) 04:35
To:user 
Subject:Flink 1.10 memory and backpressure

We are working with a process and having some problems with backpressure.

The backpressure seems to be caused by a simple Window operation, which causes 
our checkpoints to fail.

What would be the recommendations for debugging the backpressure?



Re: Singal task backpressure problem with Credit-based Flow Control

2020-05-24 Thread Zhijiang
Hi Weihua,

From your below info, it is with the expectation in credit-based flow control. 

I guess one of the sink parallelism causes the backpressure, so you will see 
that there are no available credits on Sink side and
the outPoolUsage of Map is almost 100%. It really reflects the credit-based 
states in the case of backpressure.

If you want to analyze the root cause of backpressure, you can trace the task 
stack of respective Sink parallelism to find which operation costs much,
 then you can increase the parallelism or improve the UDF(if have bottleneck) 
to have a try. In addition, i am not sure why you choose rescale to shuffle 
data among operators. The default
forward mode can gain really good performance by default if you adjusting the 
same parallelism among them.

Best,
Zhijiang
--
From:Weihua Hu 
Send Time:2020年5月24日(星期日) 18:32
To:user 
Subject:Singal task backpressure problem with Credit-based Flow Control

Hi, all

I ran into a weird single Task BackPressure problem.

JobInfo:
DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via rescale. 
Flink version: 1.9.0
There is no related info in jobmanager/taskamanger log.

Through Metrics, I see that Map (242) 's outPoolUsage is full, but its 
downstream Sink (121)' s inPoolUsage is 0.

After dumping the memory and analyzing it, I found:
Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
This is not consistent with my understanding of the Flink network transmission 
mechanism.

Can someone help me? Thanks a lot.


Best
Weihua Hu



Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-18 Thread Zhijiang
Thanks Yu for the release manager and everyone involved in.

Best,
Zhijiang
--
From:Arvid Heise 
Send Time:2020年5月18日(星期一) 23:17
To:Yangze Guo 
Cc:dev ; Apache Announce List ; 
user ; Yu Li ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.10.1 released

Thank you very much!

On Mon, May 18, 2020 at 8:28 AM Yangze Guo  wrote:
Thanks Yu for the great job. Congrats everyone who made this release possible.
 Best,
 Yangze Guo

 On Mon, May 18, 2020 at 10:57 AM Leonard Xu  wrote:
 >
 >
 > Thanks Yu for being the release manager, and everyone else who made this 
 > possible.
 >
 > Best,
 > Leonard Xu
 >
 > 在 2020年5月18日,10:43,Zhu Zhu  写道:
 >
 > Thanks Yu for being the release manager. Thanks everyone who made this 
 > release possible!
 >
 > Thanks,
 > Zhu Zhu
 >
 > Benchao Li  于2020年5月15日周五 下午7:51写道:
 >>
 >> Thanks Yu for the great work, and everyone else who made this possible.
 >>
 >> Dian Fu  于2020年5月15日周五 下午6:55写道:
 >>>
 >>> Thanks Yu for managing this release and everyone else who made this 
 >>> release possible. Good work!
 >>>
 >>> Regards,
 >>> Dian
 >>>
 >>> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
 >>>
 >>> Thanks Yu for being our release manager and everyone else who made the 
 >>> release possible!
 >>>
 >>> Cheers,
 >>> Till
 >>>
 >>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu  
 >>> wrote:
 >>>>
 >>>> Thanks a lot for the release and your great job, Yu!
 >>>> Also thanks to everyone who made this release possible!
 >>>>
 >>>> Best,
 >>>> Congxian
 >>>>
 >>>>
 >>>> Yu Li  于2020年5月14日周四 上午1:59写道:
 >>>>>
 >>>>> The Apache Flink community is very happy to announce the release of 
 >>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache 
 >>>>> Flink 1.10 series.
 >>>>>
 >>>>> Apache Flink(r) is an open-source stream processing framework for 
 >>>>> distributed, high-performing, always-available, and accurate data 
 >>>>> streaming applications.
 >>>>>
 >>>>> The release is available for download at:
 >>>>> https://flink.apache.org/downloads.html
 >>>>>
 >>>>> Please check out the release blog post for an overview of the 
 >>>>> improvements for this bugfix release:
 >>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
 >>>>>
 >>>>> The full release notes are available in Jira:
 >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
 >>>>>
 >>>>> We would like to thank all contributors of the Apache Flink community 
 >>>>> who made this release possible!
 >>>>>
 >>>>> Regards,
 >>>>> Yu
 >>>
 >>>
 >>
 >>
 >> --
 >>
 >> Benchao Li
 >> School of Electronics Engineering and Computer Science, Peking University
 >> Tel:+86-15650713730
 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn
 >
 >


-- 
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng



Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-18 Thread Zhijiang
Thanks Yu for the release manager and everyone involved in.

Best,
Zhijiang
--
From:Arvid Heise 
Send Time:2020年5月18日(星期一) 23:17
To:Yangze Guo 
Cc:dev ; Apache Announce List ; 
user ; Yu Li ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.10.1 released

Thank you very much!

On Mon, May 18, 2020 at 8:28 AM Yangze Guo  wrote:
Thanks Yu for the great job. Congrats everyone who made this release possible.
 Best,
 Yangze Guo

 On Mon, May 18, 2020 at 10:57 AM Leonard Xu  wrote:
 >
 >
 > Thanks Yu for being the release manager, and everyone else who made this 
 > possible.
 >
 > Best,
 > Leonard Xu
 >
 > 在 2020年5月18日,10:43,Zhu Zhu  写道:
 >
 > Thanks Yu for being the release manager. Thanks everyone who made this 
 > release possible!
 >
 > Thanks,
 > Zhu Zhu
 >
 > Benchao Li  于2020年5月15日周五 下午7:51写道:
 >>
 >> Thanks Yu for the great work, and everyone else who made this possible.
 >>
 >> Dian Fu  于2020年5月15日周五 下午6:55写道:
 >>>
 >>> Thanks Yu for managing this release and everyone else who made this 
 >>> release possible. Good work!
 >>>
 >>> Regards,
 >>> Dian
 >>>
 >>> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
 >>>
 >>> Thanks Yu for being our release manager and everyone else who made the 
 >>> release possible!
 >>>
 >>> Cheers,
 >>> Till
 >>>
 >>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu  
 >>> wrote:
 >>>>
 >>>> Thanks a lot for the release and your great job, Yu!
 >>>> Also thanks to everyone who made this release possible!
 >>>>
 >>>> Best,
 >>>> Congxian
 >>>>
 >>>>
 >>>> Yu Li  于2020年5月14日周四 上午1:59写道:
 >>>>>
 >>>>> The Apache Flink community is very happy to announce the release of 
 >>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache 
 >>>>> Flink 1.10 series.
 >>>>>
 >>>>> Apache Flink(r) is an open-source stream processing framework for 
 >>>>> distributed, high-performing, always-available, and accurate data 
 >>>>> streaming applications.
 >>>>>
 >>>>> The release is available for download at:
 >>>>> https://flink.apache.org/downloads.html
 >>>>>
 >>>>> Please check out the release blog post for an overview of the 
 >>>>> improvements for this bugfix release:
 >>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
 >>>>>
 >>>>> The full release notes are available in Jira:
 >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
 >>>>>
 >>>>> We would like to thank all contributors of the Apache Flink community 
 >>>>> who made this release possible!
 >>>>>
 >>>>> Regards,
 >>>>> Yu
 >>>
 >>>
 >>
 >>
 >> --
 >>
 >> Benchao Li
 >> School of Electronics Engineering and Computer Science, Peking University
 >> Tel:+86-15650713730
 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn
 >
 >


-- 
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng



Re: Flink Streaming Job Tuning help

2020-05-12 Thread Zhijiang
Hi Kumar,

I can give some general ideas for further analysis. 

> We are finding that flink lags seriously behind when we introduce the keyBy 
> (presumably because of shuffle across the network)
The `keyBy` would break the chained operators, so it might bring obvious 
performance sensitive in practice. I guess if your previous way without keyBy 
can make use of chained mechanism, 
the follow-up operator can consume the emitted records from the preceding 
operator directly, no need to involve in buffer serialization-> network shuffle 
-> buffer deserializer processes,
especially your record size 10K is a bit large.

If the keyBy is necessary in your case, then you can further check the current 
bottleneck. E.g. whether there are back pressure which you can monitor from web 
UI. If so, which task is the
bottleneck to cause the back pressure, and you can trace it by network related 
metrics. 

Whether there are data skew in your case, that means some task would process 
more records than others. If so, maybe we can increase the parallelism to 
balance the load.

Best,
Zhijiang
--
From:Senthil Kumar 
Send Time:2020年5月13日(星期三) 00:49
To:user@flink.apache.org 
Subject:Re: Flink Streaming Job Tuning help

I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar



Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Zhijiang
Thanks Dian for the release work and thanks everyone involved. 

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2020 Apr. 27 (Mon.) 15:13
To:Jingsong Li 
Cc:dev ; Leonard Xu ; Benchao Li 
; Konstantin Knauf ; jincheng 
sun ; Hequn Cheng ; Dian Fu 
; user ; user-zh 
; Apache Announce List 
Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released

Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>> >>>> @Dian, thanks a lot for the release and for being the release
>> manager.
>> >>>> Also thanks to everyone who made this release possible!
>> >>>>
>> >>>> Best,
>> >>>> Hequn
>> >>>>
>> >>>> On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> >>>>
>> >>>>> Hi everyone,
>> >>>>>
>> >>>>> The Apache Flink community is very happy to announce the release of
>> >>>>> Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> >>>>> 1.9 series.
>> >>>>>
>> >>>>> Apache Flink(r) is an open-source stream processing framework for
>> >>>>> distributed, high-performing, always-available, and accurate data
>> streaming
>> >>>>> applications.
>> >>>>>
>> >>>>> The release is available for download at:
>> >>>>> https://flink.apache.org/downloads.html
>> >>>>>
>> >>>>> Please check out the release blog post for an overview of the
>> >>>>> improvements for this bugfix release:
>> >>>>> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >>>>>
>> >>>>> The full release notes are available in Jira:
>> >>>>> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >>>>>
>> >>>>> We would like to thank all contributors of the Apache Flink
>> community
>> >>>>> who made this release possible!
>> >>>>> Also great thanks to @Jincheng for helping finalize this release.
>> >>>>>
>> >>>>> Regards,
>> >>>>> Dian
>> >>>>>
>> >>>>
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica <https://www.ververica.com/>
>> >>
>> >> --
>> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>



Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Zhijiang
Thanks Dian for the release work and thanks everyone involved. 

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2020 Apr. 27 (Mon.) 15:13
To:Jingsong Li 
Cc:dev ; Leonard Xu ; Benchao Li 
; Konstantin Knauf ; jincheng 
sun ; Hequn Cheng ; Dian Fu 
; user ; user-zh 
; Apache Announce List 
Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released

Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>> >>>> @Dian, thanks a lot for the release and for being the release
>> manager.
>> >>>> Also thanks to everyone who made this release possible!
>> >>>>
>> >>>> Best,
>> >>>> Hequn
>> >>>>
>> >>>> On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> >>>>
>> >>>>> Hi everyone,
>> >>>>>
>> >>>>> The Apache Flink community is very happy to announce the release of
>> >>>>> Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> >>>>> 1.9 series.
>> >>>>>
>> >>>>> Apache Flink(r) is an open-source stream processing framework for
>> >>>>> distributed, high-performing, always-available, and accurate data
>> streaming
>> >>>>> applications.
>> >>>>>
>> >>>>> The release is available for download at:
>> >>>>> https://flink.apache.org/downloads.html
>> >>>>>
>> >>>>> Please check out the release blog post for an overview of the
>> >>>>> improvements for this bugfix release:
>> >>>>> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >>>>>
>> >>>>> The full release notes are available in Jira:
>> >>>>> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >>>>>
>> >>>>> We would like to thank all contributors of the Apache Flink
>> community
>> >>>>> who made this release possible!
>> >>>>> Also great thanks to @Jincheng for helping finalize this release.
>> >>>>>
>> >>>>> Regards,
>> >>>>> Dian
>> >>>>>
>> >>>>
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica <https://www.ververica.com/>
>> >>
>> >> --
>> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>



Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-09 Thread Zhijiang
Great work! Thanks Gordon for the continuous efforts for enhancing stateful 
functions and the efficient release!
Wish stateful functions becoming more and more popular in users.

Best,
Zhijiang


--
From:Yun Tang 
Send Time:2020 Apr. 9 (Thu.) 00:17
To:Till Rohrmann ; dev 
Cc:Oytun Tez ; user 
Subject:Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

Excited to see the stateful functions release!
Thanks for the great work of manager Gordon and everyone who ever contributed 
to this.

Best
Yun Tang

From: Till Rohrmann 
Sent: Wednesday, April 8, 2020 14:30
To: dev 
Cc: Oytun Tez ; user 
Subject: Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

Great news! Thanks a lot for being our release manager Gordon and to everyone 
who helped with the release.

Cheers,
Till

On Wed, Apr 8, 2020 at 3:57 AM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Thanks a lot for the release and your great job, Gordon!
Also thanks to everyone who made this release possible!

Best,
Congxian


Oytun Tez mailto:oy...@motaword.com>> 于2020年4月8日周三 上午2:55写道:

> I should also add, I couldn't agree more with this sentence in the release
> article: "state access/updates and messaging need to be integrated."
>
> This is something we strictly enforce in our Flink case, where we do not
> refer to anything external for storage, use Flink as our DB.
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com<mailto:oy...@motaword.com>
>
>   <https://www.motaword.com/blog>
>
>
> On Tue, Apr 7, 2020 at 12:26 PM Oytun Tez 
> mailto:oy...@motaword.com>> wrote:
>
>> Great news! Thank you all.
>>
>> On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira 
>> mailto:ma...@ververica.com>>
>> wrote:
>>
>>> Thank you for managing the release, Gordon — you did a tremendous job!
>>> And to everyone else who worked on pushing it through.
>>>
>>> Really excited about the new use cases that StateFun 2.0 unlocks for
>>> Flink users and beyond!
>>>
>>>
>>> Marta
>>>
>>> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng 
>>> mailto:he...@apache.org>> wrote:
>>>
>>>> Thanks a lot for the release and your great job, Gordon!
>>>> Also thanks to everyone who made this release possible!
>>>>
>>>> Best,
>>>> Hequn
>>>>
>>>> On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
>>>> mailto:tzuli...@apache.org>>
>>>> wrote:
>>>>
>>>>> The Apache Flink community is very happy to announce the release of
>>>>> Apache Flink Stateful Functions 2.0.0.
>>>>>
>>>>> Stateful Functions is an API that simplifies building distributed
>>>>> stateful applications.
>>>>> It's based on functions with persistent state that can interact
>>>>> dynamically with strong consistency guarantees.
>>>>>
>>>>> Please check out the release blog post for an overview of the release:
>>>>> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>>>>>
>>>>> The release is available for download at:
>>>>> https://flink.apache.org/downloads.html
>>>>>
>>>>> Maven artifacts for Stateful Functions can be found at:
>>>>> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>>>>>
>>>>> Python SDK for Stateful Functions published to the PyPI index can be
>>>>> found at:
>>>>> https://pypi.org/project/apache-flink-statefun/
>>>>>
>>>>> Official Docker image for building Stateful Functions applications is
>>>>> currently being published to Docker Hub.
>>>>> Dockerfiles for this release can be found at:
>>>>> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
>>>>> Progress for creating the Docker Hub repository can be tracked at:
>>>>> https://github.com/docker-library/official-images/pull/7749
>>>>>
>>>>> The full release notes are available in Jira:
>>>>>
>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878
>>>>>
>>>>> We would like to thank all contributors of the Apache Flink community
>>>>> who made this release possible!
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>> --
>>  --
>>
>> [image: MotaWord]
>> Oytun Tez
>> M O T A W O R D | CTO & Co-Founder
>> oy...@motaword.com<mailto:oy...@motaword.com>
>>
>>   <https://www.motaword.com/blog>
>>
>



Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Zhijiang

Thanks for the continuous efforts for engaging in Flink ecosystem Jeff!
Glad to see the progressive achievement. Wish more users try it out in practice.

Best,
Zhijiang



--
From:Dian Fu 
Send Time:2020 Mar. 31 (Tue.) 10:15
To:Jeff Zhang 
Cc:user ; dev 
Subject:Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

Hi Jeff,

Thanks for the great work and sharing it with the community! Very impressive 
and will try it out.

Regards,
Dian

在 2020年3月30日,下午9:16,Till Rohrmann  写道:
This is great news Jeff! Thanks a lot for sharing it with the community. 
Looking forward trying Flink on Zeppelin out :-)

Cheers,
Till
On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang  wrote:
Hi Folks,

I am very excited to announce the integration work of flink on apache zeppelin 
notebook is completed. You can now run flink jobs via datastream api, table 
api, sql, pyflink in apache apache zeppelin notebook. Download it here 
http://zeppelin.apache.org/download.html), 

Here's some highlights of this work

1. Support 3 kind of execution mode: local, remote, yarn
2. Support multiple languages  in one flink session: scala, python, sql
3. Support hive connector (reading from hive and writing to hive) 
4. Dependency management
5. UDF support (scala, pyflink)
6. Support both batch sql and streaming sql

For more details and usage instructions, you can refer following 4 blogs

1) Get started https://link.medium.com/oppqD6dIg5 2) Batch 
https://link.medium.com/3qumbwRIg5 3) Streaming 
https://link.medium.com/RBHa2lTIg5 4) Advanced usage 
https://link.medium.com/CAekyoXIg5

Welcome to use flink on zeppelin and give feedback and comments. 

-- 
Best Regards

Jeff Zhang



Re: End to End Latency Tracking in flink

2020-03-29 Thread Zhijiang
Hi Lu,

Besides Congxian's replies, you can also get some further explanations from 
"https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking;.

Best,
Zhijiang


--
From:Congxian Qiu 
Send Time:2020 Mar. 28 (Sat.) 11:49
To:Lu Niu 
Cc:user 
Subject:Re: End to End Latency Tracking in flink

Hi
As far as I know, the latency-tracking feature is for debugging usages, you can 
use it to debug, and disable it when running the job on production.
From my side, use $current_processing - $event_time is something ok, but keep 
the things in mind: the event time may not be the time ingested in Flink.

Best,
Congxian

Lu Niu  于2020年3月28日周六 上午6:25写道:

Hi,

I am looking for end to end latency monitoring of link job. Based on my study, 
I have two options: 

1. flink provide a latency tracking feature. However, the documentation says it 
cannot show actual latency of business logic as it will bypass all operators. 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking
 Also, the feature can significantly impact the performance so I assume it's 
not for usage in production. What are users use the latency tracking for? 
Sounds like only back pressure could affect the latency.  

2. I found another stackoverflow question on this. 
https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application
 . The answer suggestion to expose (current processing - the event time) after 
source and before sink for end to end latency monitoring. Is this a good 
solution? If not, What’s the official solution for end to end latency tracking? 
 

Thank you! 

Best
Lu




Re: FlinkCEP - Detect absence of a certain event

2020-03-18 Thread Zhijiang
Hi Humberto,

I guess Fuji is familiar with Flink CEP and he can answer your proposed 
question. I already cc him.

Best,
Zhijiang


--
From:Humberto Rodriguez Avila 
Send Time:2020 Mar. 18 (Wed.) 17:31
To:user 
Subject:FlinkCEP - Detect absence of a certain event

In the documentation of FlinkCEP, I found that I can enforce that a particular 
event doesn't occur between two other events using notFollowedBy or notNext.
However, I was wondering If I could detect the absence of a certain event after 
a time X. For example, if an event A is not followed by another event A within 
10 seconds, fire an alert or do something.
Could be possible to define a FlinkCEP pattern to capture that situation?
Thanks in advance, Humberto



Re: How do I get the outPoolUsage value inside my own stream operator?

2020-03-18 Thread Zhijiang
Hi Felipe,

I checked the code path, and the metric of outPoolUsage is under the following 
layer: TaskMetricGroup -> TaskIOMetricGroup -> "buffers" group -> 
"outPoolUsage".
It seems that you missed the `TaskIOMetricGroup` from below samples. You can 
get it from TaskMetricGroup.

Hope it solve your problem.

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2020 Mar. 17 (Tue.) 17:50
To:user 
Subject:Re: How do I get the outPoolUsage value inside my own stream operator?

Hi,

just for the record. I am collecting the values of outPoolUsage using
the piece of code below inside my stream operator

OperatorMetricGroup operatorMetricGroup = (OperatorMetricGroup)
this.getMetricGroup();
TaskMetricGroup taskMetricGroup = operatorMetricGroup.parent();
MetricGroup metricGroup = taskMetricGroup.getGroup("buffers");
Gauge gauge = (Gauge) metricGroup.getMetric("outPoolUsage");
if (gauge != null && gauge.getValue() != null) {
   float outPoolUsage = gauge.getValue().floatValue();
   this.outPoolUsageHistogram.update((long) (outPoolUsage * 100));
}


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Mar 16, 2020 at 5:17 PM Felipe Gutierrez
 wrote:
>
> Hi community,
>
> I have built my own operator (not a UDF) and I want to collect the
> metrics of "outPoolUsage" inside it. How do I do it assuming that I
> have to do some modifications in the source code?
>
> I know that the Gouge comes from
> flink-runtime/org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.java.
> Inside of my operator MyAbstractUdfStreamOperator I can get the
> "MetricGroup metricGroup = this.getMetricGroup()".
> Then I implemented the "Gauge gauge = (Gauge)
> metricGroup.getMetric("outPoolUsage");" but it returns null all the
> time. Even when I click on the Backpressure UI Interface.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com



Re: Help me understand this Exception

2020-03-18 Thread Zhijiang
Agree with Gordon's below explanation!

Besides that, maybe you can also check the job master's log which might 
probably show the specific exception to cause this failure.

I was thinking whether it is necessary to improve 
ExceptionInChainedOperatorException to also provide the message from the 
wrapped real exception,
then users can easily get the root cause directly, not only for the current 
message "Could not forward element to next operator".

Best,
Zhijiang


--
From:Tzu-Li (Gordon) Tai 
Send Time:2020 Mar. 18 (Wed.) 00:01
To:aj 
Cc:user 
Subject:Re: Help me understand this Exception

Hi,

The exception stack you posted simply means that the next operator in the chain 
failed to process the output watermark.
There should be another exception, which would explain why some operator was 
closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon
On Tue, Mar 17, 2020 at 11:27 PM aj  wrote:

Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements 
AssignerWithPunctuatedWatermarks {
@Override
public long extractTimestamp(GenericRecord record, long 
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long 
extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark: 
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWat

Re: Backpressure and 99th percentile latency

2020-03-07 Thread Zhijiang
Thanks for the feedback Felipe! 
Regarding with your below concern:

> Although I think it is better to use outPoolUsage and inPoolUsage according 
> to [1]. However, in your opinion is it better (faster to see) to use 
> inputQueueLength and
> outputQueueLength or outPoolUsage and inPoolUsage to monitor a consequence of 
> backpressure? I mean, is there a faster way to show that the latency 
> increased due to
> backpressure? Maybe if I create my own metric on my own operator or udf?

The blog [1] already gave a great explanation of network stack for users in 
general and I also have the consensus on this issue.
 In particular,I can provide some further notes for your understanding.

1. It is not easy for users to get the precise total amount of input & output 
buffers, so we are not aware of whether the input & output buffers are 
exhausted and backpressure is happened from the metrics of 
input In contrast, we can know easily that input & 
outputPoolUsage should both reach 100% once backpressure happening.

2. The inputPoolUsage has the different semantic from release-1.9. Before 1.9 
this metric is only for measuring the usage of floating buffers. But from 1.9 
it also covers the usage of exclusive buffers. That means from 1.9 you might 
see the inputPoolUsage far from 100% when backpressure happens especially in 
the data skew case, but the inputFloatingBufferUsage should be 100% instead.

3. The latency marker provided by flink framework is emitted to a random 
channel (non-broadcast) every time because of performance concern. So it is 
hard to say whether it is measuring the heavy-load channel or lightweight 
channel in short while, especially in data skew scenario.

4. In theory the latency should be increased along with the trend of increased 
input and input All of them should be 
proportional to have the same trend in most cases. 

Best,
Zhijiang




--
From:Felipe Gutierrez 
Send Time:2020 Mar. 7 (Sat.) 18:49
To:Arvid Heise 
Cc:Zhijiang ; user 
Subject:Re: Backpressure and 99th percentile latency

Hi,
I implemented my own histogram metric on my operator to measure the
latency. The latency is following the throughput at the same pace now.
The figures are attached.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Mar 6, 2020 at 9:38 AM Felipe Gutierrez
 wrote:
>
> Thanks for the clarified answer @Zhijiang, I am gonna monitor
> inputQueueLength and outputQueueLength to check some relation with
> backpressure. Although I think it is better to use outPoolUsage and
> inPoolUsage according to [1].
> However, in your opinion is it better (faster to see) to use
> inputQueueLength and outputQueueLength or outPoolUsage and inPoolUsage
> to monitor a consequence of backpressure? I mean, is there a faster
> way to show that the latency increased due to backpressure? Maybe if I
> create my own metric on my own operator or udf?
>
> Thanks @Arvid. In the end I want to be able to hold SLAs. For me, the
> SLA would be the minimum latency. If I understood correctly, in the
> time that I started to have backpressure the latency track metrics are
> not a very precise indication of how much backpressure my application
> is suffering. It just indicates that there is backpressure.
> What would you say that is more less precise metric to tune the
> throughput in order to not have backpressure. Something like, if I
> have 50,000 milliseconds of latency and the normal latency is 150
> milliseconds, the throughput has to decrease by a factor of 50,000/150
> times.
>
> Just a note. I am not changing the throughput of the sources yet. I am
> changing the size of the window without restart the job. But I guess
> they have the same meaning for this question.
>
> [1] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
> On Fri, Mar 6, 2020 at 8:17 AM Arvid Heise  wrote:
> >
> > Hi Felipe,
> >
> > latency under backpressure has to be carefully interpreted. Latency's 
> > semantics actually require that the data source is read in a timely manner; 
> > that is, there is no bottleneck in your pipeline where data is piling up.
> >
> > Thus, to measure latency in experiments you must ensure that the current 
> > throughput is below the maximum throughput, for example by gradually 
> > increasing the throughput with a generating source or through some 
> > throttles on the external source. Until you reach the maximum throughput, 
> > latencies semantics is exactly 

Re: Backpressure and 99th percentile latency

2020-03-05 Thread Zhijiang
Hi Felipe,

Try to answer your below questions.

> I understand that I am tracking latency every 10 seconds for each physical 
> instance operator. Is that right?

Generally right. The latency marker is emitted from source and flow through all 
the intermediate operators until sink. This interval controls the emitting 
frequency of source.

> The backpressure goes away but the 99th percentile latency is still the same. 
> Why? Does it have no relation with each other?

The latency might be influenced by buffer flush timeout, network transport and 
load, etc.  In the case of backpressure, there are huge in-flight data 
accumulated in network wire, so the latency marker is queuing to wait for 
network transport which might bring obvious delay. Even the latency marker can 
not be emitted in time from source because of no available buffers temporarily. 

After the backpressure goes away, that does not mean there are no accumulated 
buffers on network wire, just not reaching the degree of backpressure. So the 
latency marker still needs to be queued with accumulated buffers on the wire. 
And it might take some time to digest the previous accumulated buffers 
completed to relax the latency. I guess it might be your case. You can monitor 
the metrics of "inputQueueLength" and "outputQueueLength" for confirming the 
status. Anyway, the answer is yes that it has relation with backpressure, but 
might have some delay to see the changes obviously.

>In the end I left the experiment for more than 2 hours running and only after 
>about 1,5 hour the 99th percentile latency got down to milliseconds. Is that 
>normal?

I guess it is normal as mentioned above.  After there are no accumulated 
buffers in network stack completely without backpressure, it should go down to 
milliseconds.

Best,
Zhijiang
--
From:Felipe Gutierrez 
Send Time:2020 Mar. 6 (Fri.) 05:04
To:user 
Subject:Backpressure and 99th percentile latency

Hi,

I am a bit confused about the topic of tracking latency in Flink [1]. It says 
if I use the latency track I am measuring the Flink’s network stack but 
application code latencies also can influence it. For instance, if I am using 
the metrics.latency.granularity: operator (default) and 
setLatencyTrackingInterval(1). I understand that I am tracking latency 
every 10 seconds for each physical instance operator. Is that right?

In my application, I am tracking the latency of all aggregators. When I have a 
high workload and I can see backpressure from the flink UI the 99th percentile 
latency is 13, 25, 21, and 25 seconds. Then I set my aggregator to have a 
larger window. The backpressure goes away but the 99th percentile latency is 
still the same. Why? Does it have no relation with each other?

In the end I left the experiment for more than 2 hours running and only after 
about 1,5 hour the 99th percentile latency got down to milliseconds. Is that 
normal? Please see the figure attached.

[1] 
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: Question: Determining Total Recovery Time

2020-02-26 Thread Zhijiang
Hi Morgan,

Your idea is great and i am also interested in it.
 I think it is valuable for some users to estimate the maximum throughput 
capacity based on certain metrics or models.
But I am not quite sure whether it is feasible to do that based on existing 
metrics, at-least exist some limitations as Arvid mentioned.

If I understand correctly, the maximum throughput you want to measure is based 
on source emitting,
 that means some data are still buffered in mid topology and not processed yet. 
If so, we might refer to the metrics of `inputQueueLength`
and `inPoolUsage` together. Note if the `inPoolUsage` reaches 100%, it does not 
mean all the buffers are already filled with data, and just mean
all the available buffers are requested away. So `inputQueueLength` would be 
more precise to predict the available condition if we are aware of the
total buffer amount. In general we can make use of these two together.

 We can find the largest value of above metrics from all the topology tasks, 
which probably hint the bottleneck in the whole view. Then we can estimate
how many available buffers are left to hold more source emitting throughput. 
But there is a limitation if all the metrics are `0` in light-weight situation,
which i mentioned above. So we can not estimate the saturation unless we 
increase the source emit.

Wish good news sharing from you!

Best,
Zhijiang


--
From:Arvid Heise 
Send Time:2020 Feb. 26 (Wed.) 22:29
To:Morgan Geldenhuys 
Cc:Timo Walther ; user 
Subject:Re: Question: Determining Total Recovery Time

Hi Morgan,

doing it in a very general way sure is challenging.

I'd assume that your idea of using the buffer usage has some shortcomings 
(which I don't know), but I also think it's a good starting point.

Have you checked the PoolUsage metrics? [1] You could use them to detect the 
bottleneck and then estimate the max capacity of the whole job.

Btw, I'd be interested in results. We have the idea of adjustable buffer sizes 
and the insights would help us.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service
On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys 
 wrote:

 Hi Arvid, Timo,

 Really appreciate the feedback. I have one final question on this topic and 
hope you dont mind me posing it to you directly. I posted the question earlier 
to the mailing list, but am looking at this more from an academic perspective 
as apposed to manually optimizing a specific job for a specific production 
environment. I do not know the flink internals well enough to determine if I 
can accomplish what I am looking for.

 For an experiment, I need to work out the Total Recovery Time (TRT). I define 
this as the time it takes the system to "catch up" to the current timestamp 
assuming event time processing after a node failure.

 I would like to follow a heuristic approach which is: 

job+environment agnostic, 
does not involve load testing, 
does not involve modifying the job or flink codebase, and 
relies solely on the metrics supplied.  As far as I know (and correct me if im 
wrong): TRT = heartbeat.timeout + recoveryTime+ time to reprocess 
uncheckpointed messages + lag to catch up to current timestamp.

 In order to predict TRT, I need some kind of resource utilization model based 
on the current processing capacity and maximum processing limit, let me explain:

Backpressure is essentially the point at which utilization has reached 100% for 
any particular streaming pipeline and means that the application has reached 
the max limit of messages that it can process per second. 
Lets consider an example: The system is running along perfectly fine under 
normal conditions, accessing external sources, and processing at an average of 
100,000 messages/sec. Lets assume the maximum capacity is around 130,000 
message/sec before back pressure starts propagating messages back up the 
stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at present we 
dont know that 130,000 is the limit without load testing. 
For this example, is there a way of finding this maximum capacity (and hence 
the utilization) without pushing the system to its limit based solely on the 
average current throughput? Possibly by measuring the saturation of certain 
buffers between the operators? 
 If this is possible, the unused utilization can be used to predict how fast a 
system would get back to the current timestamp. Again, its a heuristic so it 
doesn't have to be extremely precise. Any hints would be greatly appreciated.

 Thank you very much!

 Regards,
 Morgan.

On 21.02.20 14:44, Arvid Heise wrote:
Hi Morgan,

sorry for the late reply. In general, that should work. You need to ensure that 
the same task is processing the same record though.

Local copy needs to be state or else the last message would be lost upon 
restart. Performance will take a hit but if that is significa

Re: How JobManager and TaskManager find each other?

2020-02-26 Thread Zhijiang
I guess you are indicating the data shuffle process among different task 
managers.

While task manager(TM) registering itself to the job manager(JM), it also 
carries the infos of ip address and data port that it listens to.
During the process of scheduling tasks, the upstream TM's address info(ip, 
port) would be covered inside the data structure of task
 deployment descriptor for respective downstream tasks. Then the downstream 
tasks can connect to the remote upstream TM
to request data.

In short words, JM knows all the addresses of TMs via registration, then these 
addresses would be sent to the required peers during task schedule and 
deployment.

Best,
Zhijiang


--
From:KristoffSC 
Send Time:2020 Feb. 26 (Wed.) 19:39
To:user 
Subject:Re: How JobManager and TaskManager find each other?

Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Zhijiang
Congrats Jingsong! Welcome on board!

Best,
Zhijiang


--
From:Zhenghua Gao 
Send Time:2020 Feb. 21 (Fri.) 12:49
To:godfrey he 
Cc:dev ; user 
Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

Congrats Jingsong!


Best Regards,
Zhenghua Gao

On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:

Congrats Jingsong! Well deserved.

Best,
godfrey
Jeff Zhang  于2020年2月21日周五 上午11:49写道:
Congratulations!Jingsong. You deserve it 

wenlong.lwl  于2020年2月21日周五 上午11:43写道:
Congrats Jingsong!

 On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:

 > Congrats Jingsong!
 >
 > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
 > >
 > > Congratulations Jingsong! Well deserved.
 > >
 > > Best,
 > > Jark
 > >
 > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
 > >
 > >> Congratulations! Jingsong
 > >>
 > >>
 > >> Best,
 > >> Dan Zou
 > >>
 >
 >


-- 
Best Regards

Jeff Zhang



Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
BTW, the FLIP-75 is going for the user experience of web UI.
@Yadong Xiehave we already considered this issue to unify the ids in different 
parts in FLIP-75? 

Best,
Zhijiang
--
From:Zhijiang 
Send Time:2020 Feb. 14 (Fri.) 13:03
To:Jiayi Liao 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re: Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

Let's move the further discussion onto the jira page. 
I have not much time recently for working on this. If you want to take it, I 
can assign it to you and help review the PR if have time then. Or I can find 
other possible guys work on it future.

Best,
Zhijiang


--
From:Jiayi Liao 
Send Time:2020 Feb. 14 (Fri.) 12:39
To:Zhijiang 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi Zhijiang,

It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 
@杨东晓 Do you have time to work on this?

[1]. https://issues.apache.org/jira/browse/FLINK-16051

Best Regards,
Jiayi Liao



At 2020-02-14 10:14:27, "Zhijiang"  wrote:
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks







Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
Let's move the further discussion onto the jira page. 
I have not much time recently for working on this. If you want to take it, I 
can assign it to you and help review the PR if have time then. Or I can find 
other possible guys work on it future.

Best,
Zhijiang


--
From:Jiayi Liao 
Send Time:2020 Feb. 14 (Fri.) 12:39
To:Zhijiang 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi Zhijiang,

It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 
@杨东晓 Do you have time to work on this?

[1]. https://issues.apache.org/jira/browse/FLINK-16051

Best Regards,
Jiayi Liao



At 2020-02-14 10:14:27, "Zhijiang"  wrote:
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks






Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks



Re: Encountered error while consuming partitions

2020-02-13 Thread Zhijiang
Thanks for reporting this issue and I also agree with the below analysis. 
Actually we encountered the same issue several years ago and solved it also via 
the netty idle handler.

Let's trace it via the ticket [1] as the following step.

[1] https://issues.apache.org/jira/browse/FLINK-16030

Best,
Zhijiang


--
From:张光辉 
Send Time:2020 Feb. 12 (Wed.) 22:19
To:Benchao Li 
Cc:刘建刚 ; user 
Subject:Re: Encountered error while consuming partitions

Network can fail in many ways, sometimes pretty subtle (e.g. high ratio packet 
loss). 

The problem is that the long tcp connection between netty client and server is 
lost, then the server failed to send message to the client, and shut down the 
channel. The Netty Client  does not know that the connection has been 
disconnected, so it has been waiting. 

To detect long tcp connection alive on netty client and server, we should have 
two ways: tcp keepalives and heartbeat.
Tcp keepalives is 2 hours by default. When the error occurs, if you continue to 
wait for 2 hours, the netty client will trigger exception and enter failover 
recovery.
If you want to detect long tcp connection quickly, netty provides 
IdleStateHandler which it use ping-pang mechanism. If netty client send 
continuously n ping message and receive no one pang message, then trigger 
exception.





Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Zhijiang
Really great work and thanks everyone involved, especially for the release 
managers!

Best,
Zhijiang
--
From:Kurt Young 
Send Time:2020 Feb. 13 (Thu.) 11:06
To:[None]
Cc:user ; dev 
Subject:Re: [ANNOUNCE] Apache Flink 1.10.0 released

Congratulations to everyone involved! 
Great thanks to Yu & Gary for being the release manager!

Best,
Kurt


On Thu, Feb 13, 2020 at 10:06 AM Hequn Cheng  wrote:

Great thanks to Yu & Gary for being the release manager! 
Also thanks to everyone who made this release possible!

Best, Hequn
On Thu, Feb 13, 2020 at 9:54 AM Rong Rong  wrote:
Congratulations, a big thanks to the release managers for all the hard works!!

--
Rong
On Wed, Feb 12, 2020 at 5:52 PM Yang Wang  wrote:
Excellent work. Thanks Gary & Yu for being the release manager.


Best,
Yang
Jeff Zhang  于2020年2月13日周四 上午9:36写道:
Congratulations! Really appreciated your hard work.

Yangze Guo  于2020年2月13日周四 上午9:29写道:
Thanks, Gary & Yu. Congrats to everyone involved!

 Best,
 Yangze Guo

 On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li  wrote:
 >
 > Congratulations! Great work.
 >
 > Best,
 > Jingsong Lee
 >
 > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu  wrote:
 >>
 >> Great news!
 >> Thanks everyone involved !
 >> Thanks Gary and Yu for being the release manager !
 >>
 >> Best,
 >> Leonard Xu
 >>
 >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
 >>
 >> Congrats to us all.
 >>
 >> A big piece of work, nicely done.
 >>
 >> Let's hope that this helps our users make their existing use cases easier 
 >> and also opens up new use cases.
 >>
 >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
 >>>
 >>> Greet work.
 >>>
 >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
 >>>>
 >>>> Great work.
 >>>> Thanks everyone involved.
 >>>> Thanks Gary and Yu for being the release manager
 >>>>
 >>>>
 >>>> Best,
 >>>> Congxian
 >>>>
 >>>>
 >>>> Jark Wu  于2020年2月12日周三 下午9:46写道:
 >>>>>
 >>>>> Congratulations to everyone involved!
 >>>>> Great thanks to Yu & Gary for being the release manager!
 >>>>>
 >>>>> Best,
 >>>>> Jark
 >>>>>
 >>>>> On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
 >>>>>>
 >>>>>> Cheers!
 >>>>>> Thanks Gary and Yu for the great job as release managers.
 >>>>>> And thanks to everyone whose contribution makes the release possible!
 >>>>>>
 >>>>>> Thanks,
 >>>>>> Zhu Zhu
 >>>>>>
 >>>>>> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
 >>>>>>>
 >>>>>>> Sounds great. Congrats & Thanks!
 >>>>>>>
 >>>>>>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
 >>>>>>>>
 >>>>>>>> The Apache Flink community is very happy to announce the release of 
 >>>>>>>> Apache Flink 1.10.0, which is the latest major release.
 >>>>>>>>
 >>>>>>>> Apache Flink(r) is an open-source stream processing framework for 
 >>>>>>>> distributed, high-performing, always-available, and accurate data 
 >>>>>>>> streaming applications.
 >>>>>>>>
 >>>>>>>> The release is available for download at:
 >>>>>>>> https://flink.apache.org/downloads.html
 >>>>>>>>
 >>>>>>>> Please check out the release blog post for an overview of the 
 >>>>>>>> improvements for this new major release:
 >>>>>>>> https://flink.apache.org/news/2020/02/11/release-1.10.0.html
 >>>>>>>>
 >>>>>>>> The full release notes are available in Jira:
 >>>>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
 >>>>>>>>
 >>>>>>>> We would like to thank all contributors of the Apache Flink community 
 >>>>>>>> who made this release possible!
 >>>>>>>>
 >>>>>>>> Cheers,
 >>>>>>>> Gary & Yu
 >>
 >>
 >
 >
 > --
 > Best, Jingsong Lee


-- 
Best Regards

Jeff Zhang



Re: How can I find out which key group belongs to which subtask

2020-01-10 Thread Zhijiang
Only chained operators can avoid record serialization cost, but the chaining 
mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can 
avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer 
to support such requirement in release-1.10. You can have a try when the rc 
candidate is ready.

Best,
Zhijiang


--
From:杨东晓 
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu 
Cc:user 
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific 
subtask which belongs to same taskmanager with upstream record. The key idea is 
to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get 
keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred 
between operator in same taskmanager?
Thanks.
Congxian Qiu  于2020年1月9日周四 上午1:55写道:

Hi

If you just want to make sure some key goes into the same subtask, does custom 
key selector[1] help?

For the keygroup and subtask information, you can ref to 
KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can 
ref to doc[3]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Best,
Congxian

杨东晓  于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is 
there any possible I can find out one key belongs to which key-group and 
essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from 
upstream still goes to same taskmanager downstream subtask .Which means even if 
we use a keyedstream function we still want no cross jvm communication happened 
during run time.
And if we can achieve that , can we also avoid the expensive cost for record 
serialization because data is only transferred in same taskmanager jvm instance?

Thanks.



Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Zhijiang
The log way is simple for tracing and you can also grep some keywords to find 
your requirement messages to avoid skimming through the whole large logs.
I am not quite sure what's your specific motivation for doing this. Besides the 
log way, you can also monitor the thread stack for confirming whether it is 
happening, but maybe it is not very convenient.
Another possible way is via the checkpoint metrics which would record the 
sync/async duration time, maybe it can also satisfy your requirements.

Best,
Zhijiang 


--
From:RKandoji 
Send Time:2020 Jan. 8 (Wed.) 10:23
To:William C 
Cc:user 
Subject:Re: How to verify if checkpoints are asynchronous or sync

Thanks for the reply.
I will check and enable debug logs specifically for the class that contains 
this log.
But in general logs are already too huge and I'm trying to suppress some of 
them, so wondering if there is any other way?

Thanks,
RKandoji


On Tue, Jan 7, 2020 at 7:50 PM William C  wrote:
Can you enable debug log to check with that?

 regards.

 on 2020/1/8 6:36, RKandoji wrote:
 > But I'm curious if there is way to verify if the checkpoints are 
 > happening asynchronously or synchronously.
 > 



Re: Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

2019-12-25 Thread Zhijiang
Hi Joe,

Your requirement is the effective exactly-once for external sink. I think your 
option 4 with TwoPhaseCommitSinkFunction is the right way to go.
Unfortunately I am not quite familiar with this part, so can not give you 
specific suggestions for using it, especially for your concern of storing 
checkpoint id.
After the holiday some guys with rich experienced with it can provide you more 
professional ideas I guess. :)

ATM you can refer to the simple implementation 
TwoPhaseCommitSinkFunctionTest#ContentDumpSinkFunction and complex one 
FlinkKafkaProducer for more insights.
In addition, the StreamingFileSink also implements the exactly-once for sink. 
You might also refer to it to get some insights if possible.

Best,
Zhijiang




--
From:Joe Hansen 
Send Time:2019 Dec. 26 (Thu.) 01:42
To:user 
Subject:Aggregating Movie Rental information in a DynamoDB table using DynamoDB 
streams and Flink

Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.

I currently store movie rental information in a DynamoDB table named
MovieRentals: {movie_title, rental_period_in_days, order_date,
rent_amount}

We have millions of movie rentals happening on any given day.  Our web
application needs to display the aggregated rental amount for any
given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title
on the MovieRental DynamoDB stream and store the aggregated rental
amounts in another DynamoDB table named RentalAmountsByMovie:
{movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are accurate. i.e.
How do I prevent results from any checkpoint from not updating the
RentalAmountsByMovie table records more than once?

1) Do I need to store checkpoint ids in the RentalAmountsByMovie table
and do conditional updates to handle the scenario described above?
2) I can possibly implement TwoPhaseCommitSinkFunction that talks to
DynamoDB. However, according to Flink documentation the commit
function can be called more than once and hence needs to be
idempotent. So even this solution requires checkpoint-ids to be stored
on the target store.
3) Another pattern seems to be storing the time-window aggregation
results in the RentalAmountsByMovie table. And the webapp will have to
compute the running total on the fly. I don't like this solution for
its latency implications to the webapp.
4) May be I can use Flink's Queryable state feature. However, that
feature seems to be in Beta:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks
usually handle **updating aggregated results in Flink external
sinks**?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!



Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread Zhijiang
If I understood correctly, different partitions of Kafka would be emitted by 
different source tasks with different watermark progress.  And the Flink 
framework would align the different watermarks to only output the smallest 
watermark among them, so the events from slow partitions would not be discarded 
because the downstream operator would only see the watermark based on the slow 
partition atm. You can refer to [1] for some details.

As for rewinding the offset of partition position, I guess it only happens in 
failure recovery case or you manually restart the job. Anyway all the topology 
tasks would be restarted and previous received watermarks are cleared.
So it would also not discard the events in this case.  Unless you can only 
rewind some source task to previous positions and keep other downstream tasks 
still running, it might have the issues you concern. But Flink can not support 
such operation/function atm. :) 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

Best,
Zhijiang
--
From:邢瑞斌 
Send Time:2019 Dec. 25 (Wed.) 20:27
To:user-zh ; user 
Subject:Rewind offset to a previous position and ensure certainty.

Hi,

I'm trying to use Kafka as an event store and I want to create several 
partitions to improve read/write throughput. Occasionally I need to rewind 
offset to a previous position for recomputing. Since order isn't guaranteed 
among partitions in Kafka, does this mean that Flink won't produce the same 
results as before when rewind even if it uses event time? For example, consumer 
for a partition progresses extremely fast and raises watermark, so events from 
other partitions are discarded. Is there any ways to prevent this from 
happening?

Thanks in advance!

Ruibin



Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread Zhijiang
If I understood correctly, different partitions of Kafka would be emitted by 
different source tasks with different watermark progress.  And the Flink 
framework would align the different watermarks to only output the smallest 
watermark among them, so the events from slow partitions would not be discarded 
because the downstream operator would only see the watermark based on the slow 
partition atm. You can refer to [1] for some details.

As for rewinding the offset of partition position, I guess it only happens in 
failure recovery case or you manually restart the job. Anyway all the topology 
tasks would be restarted and previous received watermarks are cleared.
So it would also not discard the events in this case.  Unless you can only 
rewind some source task to previous positions and keep other downstream tasks 
still running, it might have the issues you concern. But Flink can not support 
such operation/function atm. :) 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

Best,
Zhijiang
--
From:邢瑞斌 
Send Time:2019 Dec. 25 (Wed.) 20:27
To:user-zh ; user 
Subject:Rewind offset to a previous position and ensure certainty.

Hi,

I'm trying to use Kafka as an event store and I want to create several 
partitions to improve read/write throughput. Occasionally I need to rewind 
offset to a previous position for recomputing. Since order isn't guaranteed 
among partitions in Kafka, does this mean that Flink won't produce the same 
results as before when rewind even if it uses event time? For example, consumer 
for a partition progresses extremely fast and raises watermark, so events from 
other partitions are discarded. Is there any ways to prevent this from 
happening?

Thanks in advance!

Ruibin



Re: Flink task node shut it self off.

2019-12-24 Thread Zhijiang
If you use rocksDB state backend, it might consume extra native memory. 
Some resource framework cluster like yarn would kill the container if the 
memory usage exceeds some threshold. You can also double check whether it 
exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any state 
Checkpoints are disk based...
On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,  wrote:

Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 

--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?








Re: Flink task node shut it self off.

2019-12-22 Thread Zhijiang
Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 


--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?







Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Zhijiang
The hint of mmap usage below is really helpful to locate this problem. I forgot 
this biggest change for batch job in release-1.9.
The blocking type option can be set to `file` as Piotr suggested to behave 
similar as before. I think it can solve your problem. 


--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 23:37
To:Piotr Nowojski 
Cc:Zhijiang ; user@flink.apache.org 

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Thanks, Piotr. We’ll rerun our apps today with this and get back to you. 

// ah

From: Piotr Nowojski  On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] 
Cc: Zhijiang ; user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi,
I would suspect this:
https://issues.apache.org/jira/browse/FLINK-12070
To be the source of the problems.
There seems to be a hidden configuration option that avoids using memory mapped 
files:
taskmanager.network.bounded-blocking-subpartition-type: file
Could you test if helps?
Piotrek



On 21 Nov 2019, at 15:22, Hailu, Andreas  wrote:
Hi Zhijiang,

I looked into the container logs for the failure, and didn’t see any specific 
OutOfMemory errors before it was killed. I ran the application using the same 
config this morning on 1.6.4, and it went through successfully. I took a 
snapshot of the memory usage from the dashboard and can send it to you if you 
like for reference.

What stands out to me as suspicious is that on 1.9.1, the application is using 
nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its 
runtime and succeeds. The JVM heap memory itself never exceeds its capacity, 
peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes 
around mapped memory.

// ah

From: Zhijiang  
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang

--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org 
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-20 Thread Zhijiang
Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang


--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org 
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
at 
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

I drilled further down into the YARN app logs, and I found that the container 
was running out of physical memory:

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e42_1574076744505_9444_01_04 because: Container 
[pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running 
beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory 
used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

This is what leads my suspicions as this resourcing configuration worked just 
fine on 1.6.4

I’m working on getting heap dumps of these applications to try and get a better 
understanding of what’s causing the blowup in physical memory required myself, 
but it would be helpful if anyone knew what relevant changes have been made 
between these versions or where else I could look? There are some features in 
1.9 that we’d like to use in our flows so getting this sorted out, no pun 
intended, is inhibiting us from doing so.

Best,
Andreas

 Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to:  
www.gs.com/privacy-notices



Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Zhijiang
You can refer to this document [1] for the rest API details.
Actually the backpreesure uri refers to 
"/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is 
easy to get the jobid and vertexid.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html

Best,
Zhijiang
--
From:Felipe Gutierrez 
Send Time:2019 Nov. 7 (Thu.) 00:06
To:Chesnay Schepler 
Cc:Zhijiang ; user 
Subject:Re: How can I get the backpressure signals inside my function or 
operator?

If I can trigger the sample via rest API it is good for a POC. Then I can read 
from any in-memory storage using a separated thread within the operator. But 
what is the rest api that gives to me the ratio value from backpressure?

Thanks
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler  wrote:

I don't think there is a truly sane way to do this. 
I could envision a separate application triggering samples via the REST API, 
writing the results into kafka which your operator can read. This is probably 
the most reasonable solution I can come up with.
Any attempt at accessing the TaskExecutor or metrics from within the operator 
are inadvisable; you'd be encroaching into truly hacky territory.
You could also do your own backpressure sampling within your operator (separate 
thread within the operator executing the same sampling logic), but I don't know 
how easy it would be to re-use Flink code.

On 06/11/2019 13:40, Felipe Gutierrez wrote:
Does anyone know in which metric I can rely on to know if a given operator is 
activating the backpressure? 
Or how can I call the same java object that the Flink UI calls to give me the 
ratio of backpressure?

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez  
wrote:
Hi Zhijiang, 

thanks for your reply. Yes, you understood correctly.
The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" on 
the operator might be because of the way Flink runtime architecture was 
designed. But I was wondering what kind of signal I can get. I guess some 
backpressure message I could get because backpressure works to slow down the 
upstream operators. 

For example, I can see the ratio per sub-task on the web interface [1]. It 
means the physical operators. Is there any message flowing backward that I can 
get? Is there anything that makes me able to not rely on some external storage?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 5, 2019 at 12:23 PM Zhijiang  wrote:
Hi Felipe,

That is an interesting idea to control the upstream's output based on 
downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush 
output while the reduce operator is idle/hungry. In contrast, the preAggregate 
would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get 
the back pressure signal from the operator level. AIK only the upper task level 
can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of 
`Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might 
rely on some external metric reporter to query it if possible.

Best,
Zhijiang 

--
From:Felipe Gutierrez 
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user 
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

 let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> 
sink" job and the reducer is sending backpressure signals to the preAggregate, 
map and source operator. How do I get those signals inside my operator's 
implementation?
 I guess inside the function is not possible. But if I have my own operator 
implemented (preAggregate) can I get those backpressure signals?

 I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] 
on my preAggregate operator in order to decide when I stop the pre-aggregation 
and flush tuples or when I keep pre aggregating. It is something like the 
"credit based control on the network stack" [2].

 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
 [2] https://www.youtube.com/watch?v=AbqatHF3tZI

 Thanks!
 Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com  





Re: What metrics can I see the root cause of "Buffer pool is destroyed" message?

2019-11-06 Thread Zhijiang
Hi Felipe,

"Buffer pool is destroyed" is mainly caused by canceling task. That means there 
are other tasks failure which would trigger canceling all the topology tasks by 
job master.
So if you want to find the root cause, it is proper to check the job master log 
to find the first failure which triggers the following cancel operations.

In addition, which flink version are you using?

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2019 Nov. 6 (Wed.) 19:12
To:user 
Subject:What metrics can I see the root cause of "Buffer pool is destroyed" 
message?

Hi community,

Looking at the code [1] it seems that it is related to not have 
availableMemorySegments anymore. I am looking at several metrics but it hasn't 
seemed to help me understand where I can measure the root cause of this error 
message.

- flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not seem 
to give me a related cause. 
- flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength I 
see my reducer operator always with queue lenght equal 4. Pre-aggregate task 
sometimes goes to 3 but it goes only few times.
- flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and 
flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength shows 
my source task several times in 100%. But my error message comes from the 
pre-aggregate task.
- flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond 
DOES show the the pre-aggregate task is consuming a lot. But with which metric 
can I relate this to know in advance how much is a lot?

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265

Thanks for your suggestions and here is my stack trace:

java.lang.RuntimeException: Buffer pool is destroyed.
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84)
 at 
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49)
 at 
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: How can I get the backpressure signals inside my function or operator?

2019-11-05 Thread Zhijiang
Hi Felipe,

That is an interesting idea to control the upstream's output based on 
downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush 
output while the reduce operator is idle/hungry. In contrast, the preAggregate 
would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get 
the back pressure signal from the operator level. AIK only the upper task level 
can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of 
`Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might 
rely on some external metric reporter to query it if possible.

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user 
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> 
sink" job and the reducer is sending backpressure signals to the preAggregate, 
map and source operator. How do I get those signals inside my operator's 
implementation?
I guess inside the function is not possible. But if I have my own operator 
implemented (preAggregate) can I get those backpressure signals?

I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] 
on my preAggregate operator in order to decide when I stop the pre-aggregation 
and flush tuples or when I keep pre aggregating. It is something like the 
"credit based control on the network stack" [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
[2] https://www.youtube.com/watch?v=AbqatHF3tZI

Thanks!
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread zhijiang
Congratulations Zili!
--
From:Becket Qin 
Send Time:2019年9月12日(星期四) 03:43
To:Paul Lam 
Cc:Rong Rong ; dev ; user 

Subject:Re: [ANNOUNCE] Zili Chen becomes a Flink committer

Congrats, Zili!
On Thu, Sep 12, 2019 at 9:39 AM Paul Lam  wrote:
Congratulations Zili!

Best,
Paul Lam

在 2019年9月12日,09:34,Rong Rong  写道:
Congratulations Zili!

--
Rong
On Wed, Sep 11, 2019 at 6:26 PM Hequn Cheng  wrote:
Congratulations!

Best, Hequn
On Thu, Sep 12, 2019 at 9:24 AM Jark Wu  wrote:
Congratulations Zili!

 Best,
 Jark

 On Wed, 11 Sep 2019 at 23:06,  wrote:

 > Congratulations, Zili.
 >
 >
 >
 > Best,
 >
 > Xingcan
 >
 >
 >
 > *From:* SHI Xiaogang 
 > *Sent:* Wednesday, September 11, 2019 7:43 AM
 > *To:* Guowei Ma 
 > *Cc:* Fabian Hueske ; Biao Liu ;
 > Oytun Tez ; bupt_ljy ; dev <
 > d...@flink.apache.org>; user ; Till Rohrmann <
 > trohrm...@apache.org>
 > *Subject:* Re: [ANNOUNCE] Zili Chen becomes a Flink committer
 >
 >
 >
 > Congratulations!
 >
 >
 >
 > Regards,
 >
 > Xiaogang
 >
 >
 >
 > Guowei Ma  于2019年9月11日周三 下午7:07写道:
 >
 > Congratulations Zili !
 >
 >
 > Best,
 >
 > Guowei
 >
 >
 >
 >
 >
 > Fabian Hueske  于2019年9月11日周三 下午7:02写道:
 >
 > Congrats Zili Chen :-)
 >
 >
 >
 > Cheers, Fabian
 >
 >
 >
 > Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu :
 >
 > Congrats Zili!
 >
 >
 >
 > Thanks,
 >
 > Biao /'bɪ.aʊ/
 >
 >
 >
 >
 >
 >
 >
 > On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
 >
 > Congratulations!
 >
 >
 >
 > ---
 >
 > Oytun Tez
 >
 >
 >
 > *M O T A W O R D*
 >
 > *The World's Fastest Human Translation Platform.*
 >
 > oy...@motaword.com — www.motaword.com
 >
 >
 >
 >
 >
 > On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
 >
 > Congratulations!
 >
 >
 >
 > Best,
 >
 > Jiayi Liao
 >
 >
 >
 >  Original Message
 >
 > *Sender:* Till Rohrmann
 >
 > *Recipient:* dev; user
 >
 > *Date:* Wednesday, Sep 11, 2019 17:22
 >
 > *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
 >
 >
 >
 > Hi everyone,
 >
 >
 >
 > I'm very happy to announce that Zili Chen (some of you might also know
 > him as Tison Kun) accepted the offer of the Flink PMC to become a committer
 > of the Flink project.
 >
 >
 >
 > Zili Chen has been an active community member for almost 16 months now.
 > He helped pushing the Flip-6 effort over the finish line, ported a lot of
 > legacy code tests, removed a good part of the legacy code, contributed
 > numerous fixes, is involved in the Flink's client API refactoring, drives
 > the refactoring of Flink's HighAvailabilityServices and much more. Zili
 > Chen also helped the community by PR reviews, reporting Flink issues,
 > answering user mails and being very active on the dev mailing list.
 >
 >
 >
 > Congratulations Zili Chen!
 >
 >
 >
 > Best, Till
 >
 > (on behalf of the Flink PMC)
 >
 >




Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread zhijiang
Congratulations Andrey, great work and well deserved!

Best,
Zhijiang
--
From:Till Rohrmann 
Send Time:2019年8月14日(星期三) 15:26
To:dev ; user 
Subject:[ANNOUNCE] Andrey Zagrebin becomes a Flink committer

Hi everyone,

I'm very happy to announce that Andrey Zagrebin accepted the offer of the Flink 
PMC to become a committer of the Flink project.

Andrey has been an active community member for more than 15 months. He has 
helped shaping numerous features such as State TTL, FRocksDB release, Shuffle 
service abstraction, FLIP-1, result partition management and various 
fixes/improvements. He's also frequently helping out on the user@f.a.o mailing 
lists.

Congratulations Andrey!

Best, Till 
(on behalf of the Flink PMC)



Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread zhijiang
Hi paul,

In theory broadcast operator could not be chained for all-to-all mode, and 
chain is only feasible for one-to-one mode like forward. 
If chain, the next operator could process the raw record emitted by head 
operator directly. But if not, the emitted record must be serialized into 
buffer which could be consumed by the dowstream op via network ornot. So the 
chain way has the best performance in theory compared to non-chain.

In your case, if you could not bypass the requirements of broadcast, then you 
have to face the non-chain way and test whether the real performance is within 
your acception or not. If the performance is not reaching your requirements, we 
could further consider other improvements.

Best,
Zhijiang
--
From:Piotr Nowojski 
Send Time:2019年8月6日(星期二) 14:55
To:黄兆鹏 
Cc:user 
Subject:Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

On 6 Aug 2019, at 13:11, 黄兆鹏  wrote:
Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏  wrote:
Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^ 
  |   
BroadcastStream


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining? 

Thanks!
-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotre

Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread zhijiang
Congratulations Rong!

Best,
Zhijiang
--
From:Kurt Young 
Send Time:2019年7月11日(星期四) 22:54
To:Kostas Kloudas 
Cc:Jark Wu ; Fabian Hueske ; dev 
; user 
Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer

Congratulations Rong!

Best,
Kurt


On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas  wrote:
Congratulations Rong!
On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
Congratulations Rong Rong! 
Welcome on board!
On Thu, 11 Jul 2019 at 22:25, Fabian Hueske  wrote:
Hi everyone,

I'm very happy to announce that Rong Rong accepted the offer of the Flink PMC 
to become a committer of the Flink project.

Rong has been contributing to Flink for many years, mainly working on SQL and 
Yarn security features. He's also frequently helping out on the user@f.a.o 
mailing lists.

Congratulations Rong!

Best, Fabian 
(on behalf of the Flink PMC)



Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-24 Thread zhijiang
Thanks for opening this ticket and I would watch it.

Flink does not handle OOM issue specially. I remembered we ever discussed the 
similar issue before but forgot the conclusion then or have other concerns for 
it.
I am not sure whether it is worth to fix atm, maybe Till or Chesnay could give 
a final decision.

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月25日(星期二) 11:10
To:zhijiang 
Cc:Chesnay Schepler ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

Hi Zhijiang

Thank you for your analysis. I agree with it. The solution may be to let tm 
exit like you mentioned when any type of oom occurs, because the flink has no 
control on a tm when a oom occurs.

I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889.

Don't know it is worth to fix.

Thank you all.

Yours sincerely
Joshua
On Fri, Jun 21, 2019 at 5:32 PM zhijiang  wrote:
Thanks for the reminding @Chesnay Schepler .

I just looked throught the related logs. Actually all the five "Source: 
ServiceLog" tasks are not in terminal state on JM view, the relevant processes 
are as follows:

1. The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
2. The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
3. When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 

For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.

I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月21日(星期五) 16:34
To:zhijiang ; Joshua Fan 
Cc:user ; Till Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

The logs are attached to the initial mail.

 Echoing my thoughts from earlier: from the logs it looks as if the TM never 
even submits the terminal state RPC calls for several tasks to the JM.

 On 21/06/2019 10:30, zhijiang wrote:
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang 

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still acti

Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread zhijiang
Thanks for the reminding @Chesnay Schepler .

I just looked throught the related logs. Actually all the five "Source: 
ServiceLog" tasks are not in terminal state on JM view, the relevant processes 
are as follows:

1. The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
2. The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
3. When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 

For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.

I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月21日(星期五) 16:34
To:zhijiang ; Joshua Fan 
Cc:user ; Till Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

The logs are attached to the initial mail.

 Echoing my thoughts from earlier: from the logs it looks as if the TM never 
even submits the terminal state RPC calls for several tasks to the JM.

 On 21/06/2019 10:30, zhijiang wrote:
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang 

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called to cancel . I attached the 
jstack file in this email.

Yours sincerely
Joshua  
On Wed, Jun 19, 2019 at 4:40 PM zhijiang  wrote:
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

 @Till have you see something like this before? Despite all source tasks 
 reaching a terminal state on a TM (FAILED) it does not send updates to 
 the JM for all of them, but only a single one.

 On 18/06/2019 12:14, Joshua Fan wrote:
 > Hi All,
 > There is a topology of 3 operator, such as, source, parser, and 
 > persist. 

Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread zhijiang
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called to cancel . I attached the 
jstack file in this email.

Yours sincerely
Joshua
On Wed, Jun 19, 2019 at 4:40 PM zhijiang  wrote:
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and 
> persist. Occasionally, 5 subtasks of the source encounters exception 
> and turns to failed, at the same time, one subtask of the parser runs 
> into exception and turns to failed too. The jobmaster gets a message 
> of the parser's failed. The jobmaster then try to cancel all the 
> subtask, most of the subtasks of the three operator turns to canceled 
> except the 5 subtasks of the source, because the state of the 5 ones 
> is already FAILED before jobmaster try to cancel it. Then the 
> jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm 
> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua




Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-19 Thread zhijiang
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and 
> persist. Occasionally, 5 subtasks of the source encounters exception 
> and turns to failed, at the same time, one subtask of the parser runs 
> into exception and turns to failed too. The jobmaster gets a message 
> of the parser's failed. The jobmaster then try to cancel all the 
> subtask, most of the subtasks of the three operator turns to canceled 
> except the 5 subtasks of the source, because the state of the 5 ones 
> is already FAILED before jobmaster try to cancel it. Then the 
> jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm 
> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua



Re: A little doubt about the blog A Deep To Flink's NetworkStack

2019-06-16 Thread zhijiang
Hi Aitozi,

The current connection sharing is only suitbale for the same jobVertex between 
TaskMangaers via ConnectionIndex. I ever tried to remove this limit by ticket 
[1] but have not reached an agreement on this.
You could watch this jira if interested.


[1] https://issues.apache.org/jira/browse/FLINK-10462

Best,
Zhijiang
--
From:aitozi 
Send Time:2019年6月16日(星期日) 22:19
To:user 
Subject:A little doubt about the blog A Deep To Flink's NetworkStack

Hi, community

I read this blog  A Deep To Flink's NetworkStack
<https://flink.apache.org/2019/06/05/flink-network-stack.html>  

In the chapter *Physical Transport*, it says that the /if different subtasks
of the same task are scheduled onto the same TaskManager, their network
connections towards the same TaskManagers will be multiplexed and share a
single TCP channel for reduced resource usage./.

But I check the code I think each taskmanager have a NettyServer and a
NettyClient which work with a channel. So i think the tcp channel is shared
between the taskmanager remote connection no matter there is different
subtasks of the same task are scheduled onto the same TaskManager or not.

Please correct me if my thoughts was wrong.

Thanks
Aitozi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: java.io.FileNotFoundException in implementing exactly once

2019-06-11 Thread zhijiang
For exactly-once mode before flink-1.5, it needs the temp dir for spilling the 
blocking buffers during checkpoint.

The temp dir is configured via `io.tmp.dirs` and the default value is 
`java.io.tmpdir`. In your case, your temp dir prefix with `/tmp/` has some 
problems (might be deleted), and you could double check this dir for the issue.
In addition I suggestt you upgrading the flink version because flink-1.3.3 is 
too old. After upgrading to flink-1.5 above, you do not need to consider this 
issue, because the exactly-once mode would not spill data to disk any more.

Best,
Zhijiang 
--
From:syed 
Send Time:2019年6月12日(星期三) 10:16
To:user 
Subject:java.io.FileNotFoundException in implementing exactly once

Hi;
I am trying to run the standard WordCount application under exactly once and
at-least once processing guarantees, respectively. I successfully run the
at-least once guarantee, however which running the exactly once guarantee, I
face the following exception
*Root exception:*
java.io.FileNotFoundException:
/tmp/flink-io-7a8947d4-c75c-4165-85a1-fb727dd98791/ff99c56a01707c5a610ad250e77e71be4c4ed762b6294d73cf9d780e0d422444.0.buffer
(No such file or directory)
 at java.io.RandomAccessFile.open0(Native Method)
 at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
 at java.io.RandomAccessFile.(RandomAccessFile.java:243)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120)
 at
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:147)
 at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:128)
 at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:56)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:234)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:748)

*Keyed Aggregation -> Sink: Unnamed (1/1)*
java.io.FileNotFoundException:
/tmp/flink-io-7a8947d4-c75c-4165-85a1-fb727dd98791/ff99c56a01707c5a610ad250e77e71be4c4ed762b6294d73cf9d780e0d422444.0.buffer
(No such file or directory)
 at java.io.RandomAccessFile.open0(Native Method)
 at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
 at java.io.RandomAccessFile.(RandomAccessFile.java:243)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120)
 at
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:147)
 at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:128)
 at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:56)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:234)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:748)

I am using Apache Kafka to keep data source available for checkpoints, and
using flink 1.3.3.

I face the same exception either I explicitly use Exactly once
[/setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE/)] or only use [
/enableCheckpointing(1000)/]
Kind regards;
Syed




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Apache Flink - Disabling system metrics and collecting only specific metrics

2019-06-11 Thread zhijiang
Hi Mans,

AFAIK, we have no switch to disable general system metrics which are useful for 
monitoring status and performance tuning. Only some advanced system metrics 
could be confgiured to enable or not, and the default config is always 
disabled, so you do not need toconcern them.

Maybe you could implement a custom MetricReporter, and then only consentrate on 
your required application metrics in the method of  
`MetricReporter#notifyOfAddedMetric` to show them in backend.

Best,
Zhijiang


--
From:M Singh 
Send Time:2019年6月12日(星期三) 00:30
To:User 
Subject:Apache Flink - Disabling system metrics and collecting only specific 
metrics

Hi:

I am working on an application and need to collect application metrics. I would 
like to use Flink's metrics framework for my application metrics.  From Flink's 
documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 it looks like Flink collects system metrics by default but I don't need those.

Is there any way to configure metrics to 

1. disable system metrics collection, 
2. collect only specific metrics.

If there is any documentation/configuration that I might have missed, please 
let me know.

Thanks

Mans





Re: [DISCUSS] Deprecate previous Python APIs

2019-06-11 Thread zhijiang
It is reasonable as stephan explained. +1 from my side! 
--
From:Jeff Zhang 
Send Time:2019年6月11日(星期二) 22:11
To:Stephan Ewen 
Cc:user ; dev 
Subject:Re: [DISCUSS] Deprecate previous Python APIs
 +1

Stephan Ewen  于2019年6月11日周二 下午9:30写道:

> Hi all!
>
> I would suggest to deprecating the existing python APIs for DataSet and
> DataStream API with the 1.9 release.
>
> Background is that there is a new Python API under development.
> The new Python API is initially against the Table API. Flink 1.9 will
> support Table API programs without UDFs, 1.10 is planned to support UDFs.
> Future versions would support also the DataStream API.
>
> In the long term, Flink should have one Python API for DataStream and
> Table APIs. We should not maintain multiple different implementations and
> confuse users that way.
> Given that the existing Python APIs are a bit limited and not under active
> development, I would suggest to deprecate them in favor of the new API.
>
> Best,
> Stephan
>
>

-- 
Best Regards

Jeff Zhang



Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-06-04 Thread zhijiang
The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could 
find the PR link in it.
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 18:19
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Thanks Zhijiang.
Can you point us to the JIRA for your fix?

Regards,
-Rahul

From: zhijiang  
Sent: Tuesday, June 4, 2019 1:26 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 
; Erai, Rahul [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] 
Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Yes, it is the same case as multiple slots in TM. The source task and co-group 
task are still in the same TM in this case. I think you might enable slot 
sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting 
for review atm. You could pick the code in PR to verfiy the results if you 
like. And the next release-1.8.1 might cover this fix as well.
Best,
Zhijiang
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hello Zhijiang,
We have been seeing deadlocks with single slot TMs as well. Attaching the 
thread dump as requested. Looks similar to what was had with multi-slots TMs.
Thanks,
Rahul

From: zhijiang  
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?
If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ;

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-06-04 Thread zhijiang
Yes, it is the same case as multiple slots in TM. The source task and co-group 
task are still in the same TM in this case. I think you might enable slot 
sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting 
for review atm. You could pick the code in PR to verfiy the results if you 
like. And the next release-1.8.1 might cover this fix as well.

Best,
Zhijiang
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Hello Zhijiang,
We have been seeing deadlocks with single slot TMs as well. Attaching the 
thread dump as requested. Looks similar to what was had with multi-slots TMs.
Thanks,
Rahul

From: zhijiang  
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?
If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-21 Thread zhijiang
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?

If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.fli

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-21 Thread zhijiang
Hi  Krishna,

Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
 
We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apa

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread zhijiang
I already created the jira [1] for it and you could monitor it for progress.

In addition, the SpillableSubpartition would be abandoned from FLINK-1.9, and 
stephan already implemented a new BoundedBlockingSubpartition to replace it. Of 
course we would still provide the support for the existing bugs in previous 
flink versions.

[1] https://issues.apache.org/jira/browse/FLINK-12544

Best,
Zhijiang


--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 19:00
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(Networ

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread zhijiang
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.

When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.

I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x00063c785350> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
- locked <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
- waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
- locked <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.

Re: Netty channel closed at AKKA gated status

2019-04-22 Thread zhijiang
Hi Wenrui,

I think you could trace the log of node manager which contains the lifecycle of 
this task executor. Maybe this task executor is killed by node manager because 
of memory overuse.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月20日(星期六) 09:48
To:zhijiang 
Cc:Biao Liu ; user ; tzulitai 

Subject:Re: Netty channel closed at AKKA gated status

Attached the lost task manager last 1 lines log. Anyone can help take a 
look? 

Thanks,
Wenrui
On Fri, Apr 19, 2019 at 6:32 PM Wenrui Meng  wrote:
Looked at a few same instances. The lost task manager was indeed not active 
anymore since there is no log for that task manager printed after the 
connection issue timestamp. I guess somehow that task manager died silently 
without exception or termination relevant information logged. I double checked 
the lost task manager host, the GC, CPU, memory, network, disk I/O all look 
good without any spike. Is there any other possibility that the task manager 
can be terminated? We run our jobs in the yarn cluster. 
On Mon, Apr 15, 2019 at 10:47 PM zhijiang  wrote:
Hi Wenrui,

You might further check whether there exists network connection issue between 
job master and target task executor if you confirm the target task executor is 
still alive.

Best,
Zhijiang
--
From:Biao Liu 
Send Time:2019年4月16日(星期二) 10:14
To:Wenrui Meng 
Cc:zhijiang ; user ; 
tzulitai 
Subject:Re: Netty channel closed at AKKA gated status

Hi Wenrui,
If a task manager is killed (kill -9), it would have no chance to log anything. 
If the task manager exits since connection timeout, there would be something in 
log file. So it is probably killed by other user or operating system. Please 
check the log of operating system. BTW, I don't think "DEBUG log level" would 
help.
Wenrui Meng  于2019年4月16日周二 上午9:16写道:
There is no exception or any warning in the task manager 
`'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not shut 
down either in cluster monitor dashboard. It probably requires to turn on DEBUG 
log to get more useful information. If the task manager gets killed, I assume 
there will be terminating log in the task manager log. If not, I don't know how 
to figure out whether it's due to task manager gets killed or just a connection 
timeout.



On Sun, Apr 14, 2019 at 7:22 PM zhijiang  wrote:
Hi Wenrui,

I think the akka gated issue and inactive netty channel are both caused by some 
task manager exits/killed. You should double check the status and reason of 
this task manager `'athena592-phx2/10.80.118.166:44177'`.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月13日(星期六) 01:01
To:user 
Cc:tzulitai 
Subject:Netty channel closed at AKKA gated status

We encountered the netty channel inactive issue while the AKKA gated that task 
manager. I'm wondering whether the channel closed because of the AKKA gated 
status, since all message to the taskManager will be dropped at that moment, 
which might cause netty channel exception. If so, shall we have coordination 
between AKKA and Netty? The gated status is not intended to fail the system. 
Here is the stack trace fthe or exception

2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 3758 (3788228399 bytes in 5967 ms).
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96) 
(93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChann

Re: Netty channel closed at AKKA gated status

2019-04-15 Thread zhijiang
Hi Wenrui,

You might further check whether there exists network connection issue between 
job master and target task executor if you confirm the target task executor is 
still alive.

Best,
Zhijiang
--
From:Biao Liu 
Send Time:2019年4月16日(星期二) 10:14
To:Wenrui Meng 
Cc:zhijiang ; user ; 
tzulitai 
Subject:Re: Netty channel closed at AKKA gated status

Hi Wenrui,
If a task manager is killed (kill -9), it would have no chance to log anything. 
If the task manager exits since connection timeout, there would be something in 
log file. So it is probably killed by other user or operating system. Please 
check the log of operating system. BTW, I don't think "DEBUG log level" would 
help.
Wenrui Meng  于2019年4月16日周二 上午9:16写道:
There is no exception or any warning in the task manager 
`'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not shut 
down either in cluster monitor dashboard. It probably requires to turn on DEBUG 
log to get more useful information. If the task manager gets killed, I assume 
there will be terminating log in the task manager log. If not, I don't know how 
to figure out whether it's due to task manager gets killed or just a connection 
timeout.



On Sun, Apr 14, 2019 at 7:22 PM zhijiang  wrote:
Hi Wenrui,

I think the akka gated issue and inactive netty channel are both caused by some 
task manager exits/killed. You should double check the status and reason of 
this task manager `'athena592-phx2/10.80.118.166:44177'`.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月13日(星期六) 01:01
To:user 
Cc:tzulitai 
Subject:Netty channel closed at AKKA gated status

We encountered the netty channel inactive issue while the AKKA gated that task 
manager. I'm wondering whether the channel closed because of the AKKA gated 
status, since all message to the taskManager will be dropped at that moment, 
which might cause netty channel exception. If so, shall we have coordination 
between AKKA and Netty? The gated status is not intended to fail the system. 
Here is the stack trace fthe or exception

2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 3758 (3788228399 bytes in 5967 ms).
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96) 
(93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUn

Re: Can back pressure data be gathered by Flink metric system?

2019-04-15 Thread zhijiang
Hi Henry,

Thanks for the explanation. I am not sure whether it is feasible on your side 
to monitor the backpressure via restful api provided by flink.

Some experience on my side to share. We ever monitored the backpressure via the 
metrics of outqueue length/usage on producer side and inqueue length/usage on 
consumer side. Although it is not very accurate sometimes, it could provide 
some hints of backpressure, because the outqueue and inqueue should be filled 
with buffers between producer and consumer when backpressure occurs.

Best,
Zhijiang
--
From:徐涛 
Send Time:2019年4月16日(星期二) 09:33
To:zhijiang 
Cc:user 
Subject:Re: Can back pressure data be gathered by Flink metric system?

Hi Zhijiang,
 Because I want to know the current and the trend of backpressure status in 
Flink Job. Like other index such as latency, I can monitor it, and show it in 
graph by getting data from metric. Now using the metric to get the backpressure 
data is the simplest way I can think.

Best
Henry

在 2019年4月15日,上午10:34,zhijiang  写道:
Hi Henry,

The backpressure tracking is not realized in metric framework, you could check 
the details via [1]. I am not sure why your requirements is showing 
backpressure in metrics.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html

Best,
Zhijiang

 --
From:徐涛 
Send Time:2019年4月15日(星期一) 10:19
To:user 
Subject:Can back pressure data be gathered by Flink metric system?

Hi Experts,
 From the page Flink metric 
system(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 I do not find the info about the back pressure. I want to get the backpressure 
data and plot it, but I do not know how to get it via metric. 
 Can anybody help me about it? Thanks a lot.

Best
Henry





Re: Can back pressure data be gathered by Flink metric system?

2019-04-14 Thread zhijiang
Hi Henry,

The backpressure tracking is not realized in metric framework, you could check 
the details via [1]. I am not sure why your requirements is showing 
backpressure in metrics.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html

Best,
Zhijiang

 --
From:徐涛 
Send Time:2019年4月15日(星期一) 10:19
To:user 
Subject:Can back pressure data be gathered by Flink metric system?

Hi Experts,
 From the page Flink metric 
system(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 I do not find the info about the back pressure. I want to get the backpressure 
data and plot it, but I do not know how to get it via metric. 
 Can anybody help me about it? Thanks a lot.

Best
Henry



Re: Retain metrics counters across task restarts

2019-04-14 Thread zhijiang
Hi Peter,

The lifecycle of these metrics are coupled with lifecycle of task, So the 
metrics would be initialized after task is restarted. I think of one possible 
option is that you could store your required metrics into state, then the 
metric states would be restored from backend after task is restarted.

Best,
Zhijiang
--
From:Peter Zende 
Send Time:2019年4月14日(星期日) 00:25
To:user 
Subject:Retain metrics counters across task restarts

Hi all

We're exposing Prometheus metrics from our Flink (v1.7.1) pipeline to 
Prometheus, e.g: the total number of processed records. This works fine until 
any of the tasks is restarted within this yarn application. Then the counter is 
reset and it starts incrementing values from 0.
How can we retain such counter through the entire lifetime of the yarn 
application similarly to Hadoop counters?

Thanks
Peter



Re: Netty channel closed at AKKA gated status

2019-04-14 Thread zhijiang
Hi Wenrui,

I think the akka gated issue and inactive netty channel are both caused by some 
task manager exits/killed. You should double check the status and reason of 
this task manager `'athena592-phx2/10.80.118.166:44177'`.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月13日(星期六) 01:01
To:user 
Cc:tzulitai 
Subject:Netty channel closed at AKKA gated status

We encountered the netty channel inactive issue while the AKKA gated that task 
manager. I'm wondering whether the channel closed because of the AKKA gated 
status, since all message to the taskManager will be dropped at that moment, 
which might cause netty channel exception. If so, shall we have coordination 
between AKKA and Netty? The gated status is not intended to fail the system. 
Here is the stack trace fthe or exception

2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 3758 (3788228399 bytes in 5967 ms).
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96) 
(93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)



Re: Question regarding "Insufficient number of network buffers"

2019-04-11 Thread zhijiang
Hi Allen,

There are two ways for setting network buffers. The old way via 
`taskmanager.network.numberOfBuffers` is deprecated. The new way is via three 
parameters min,max and fraction. 
The specific formula is Math.min(network.memory.max, 
Math.max(network.memory.min, network.memory.fraction * jvmMemory). 
If both ways are setting, only the new way works. You can adjust these three 
parameters accordingly. 
Also you could check the log of task manager by searching " MB for network 
buffer pool (number of memory segments: " to confirm whether your setting is 
working as expected.

Best,
Zhijiang
--
From:Xiangfeng Zhu 
Send Time:2019年4月12日(星期五) 08:03
To:user 
Subject:Question regarding "Insufficient number of network buffers"

Hello,

My name is Allen, and I'm currently researching different distributed execution 
engines. I wanted to run some benchmarks on Flink with a 10-node cluster(each 
node has 64vCPUs and 376GB memory). I ran the program with parallelism 320 and 
got an error message: 
"Caused by: java.io.IOException: Insufficient number of network buffers: 
required 320, but only 128 available. The total number of network buffers is 
currently set to 32768 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.network.memory.fraction', 
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'." 

Currently, I set the following parameters:
jobmanager.heap.size: 102400m
taskmanager.memory.size: 102400m
taskmanager.numberOfTaskSlots: 32
taskmanager.network.memory.min: 102400m
taskmanager.network.memory.max: 102400m
taskmanager.network.memory.fraction: 0.5
(For the last three fields, I've also tried to set 
taskmanager.network.numberOfBuffers: 40960 directly)
Could you please give me some advice about how should I fix it?
Thank you so much! 

Best,
Allen



Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-10 Thread zhijiang
Cool!

Finally see the FLINK 1.8.0 release. Thanks Aljoscha for this excellent work 
and efforts for other contributors.

We would continue working hard for FLINK 1.9.0

Best,
Zhijiang
--
From:vino yang 
Send Time:2019年4月10日(星期三) 17:33
To:d...@flink.apache.org 
Cc:Aljoscha Krettek ; user ; 
announce 
Subject:Re: [ANNOUNCE] Apache Flink 1.8.0 released

Great news!

Thanks Aljoscha for being the release manager and thanks to all the 
contributors!

Best,
Vino
Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
Great news! Great effort by the community to make this happen. Thanks all!

 Cheers, Fokko

 Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang :

 > Thanks Aljoscha and all others who made contributions to FLINK 1.8.0.
 > Looking forward to FLINK 1.9.0.
 >
 > Regards,
 > Shaoxuan
 >
 > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek 
 > wrote:
 >
 > > The Apache Flink community is very happy to announce the release of
 > Apache
 > > Flink 1.8.0, which is the next major release.
 > >
 > > Apache Flink(r) is an open-source stream processing framework for
 > > distributed, high-performing, always-available, and accurate data
 > streaming
 > > applications.
 > >
 > > The release is available for download at:
 > > https://flink.apache.org/downloads.html
 > >
 > > Please check out the release blog post for an overview of the
 > improvements
 > > for this bugfix release:
 > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html
 > >
 > > The full release notes are available in Jira:
 > >
 > >
 > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
 > >
 > > We would like to thank all contributors of the Apache Flink community who
 > > made this release possible!
 > >
 > > Regards,
 > > Aljoscha
 >



Re: Flink credit based flow control

2019-03-11 Thread zhijiang
Hi Brian,

Actually I also thought of adding the metrics you mentioned after contributing 
the credit-based flow control. It should help performance tuning sometimes. If 
you want to add this metirc, you could trace the related process in 
`ResultSubpartition`. When the backlog is increasd during adding 
`BufferConsumer` in queue, you can check the current credits for this sub 
partition. Another possible way is when the subpartition view is changed from 
available to unavailable because of no credits, wecould add some metrics here. 
But even though we found the sender is blocked because of no credits, we should 
still need distinguish two conditions. If the sender is backpressured, then 
this condition of no credits is within expectation. 

Maybe it does not need to add extra metric to trace your problem. You can check 
whether the high latency network is caused by backpressure. And what is the 
flush timeout you config. Also you can trace the current metrics of 
outqueue.usages|length and inqueue.usags|length to find something. 

Best,
Zhijiang
--
From:Brian Ramprasad 
Send Time:2019年3月12日(星期二) 03:47
To:user 
Subject:Flink credit based flow control

Hi,
I am trying to use the most recent version of Flink over a high latency network 
and I am trying to measure how long a sender may wait for credits before it can 
send buffers to the receiver. Does anyone know which function/class where I can 
measure at the sender side the time spent waiting to receive the incoming 
credit announcements?
Thanks
Brian R



Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-03 Thread zhijiang
Hi Arnaud,

I think I understand your special user case based on your further explanation. 
As you said, it is easy for source to emit the whole file names caching in 
network buffers because the emitted file name is so small and flatmap/sink 
processing is slow. Then when checkpoint triggered, the barrier is behind the 
whole set of file names, that means the sink can not receive the barrier until 
reading and writing all the corresponding files. 

So the proper solution in your case has to control the emit rate on source side 
based on sink catchup progress in order to avoid many files queued in front of 
barriers.  This is the right way to try and wish your solution with 2 
parameters work.

Best,
Zhijiang


--
From:LINZ, Arnaud 
Send Time:2019年3月2日(星期六) 16:45
To:zhijiang ; user 
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

 Hello,
 When I think about it, I figure out that a barrier for the source is the whole 
set of files and therefore the checkpoint will never complete until the sink 
have caught up.
 The simplest way to deal with it without refactoring is to add 2 parameters to 
the source, a file number  threshold detecting the catchup mode and a max file 
per sec limitation when this occupe, slightly lower than the natural catchup 
rate.

  Message d'origine 
 De : "LINZ, Arnaud" 
 Date : ven., mars 01, 2019 2:04 PM +0100
 A : zhijiang , user 
 Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi,
I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a 
HDFS directory. DataSet emitted by the source is a data set of file names, not 
file content. These filenames are rebalanced, and sent to workers (parallelism 
= 15) that will use a flatmapper that open the file, read it, decode it, and 
send records (forward mode) to the sinks (with a few 1-to-1 mapping 
in-between). So the flatmap operation is a time-consuming one as the files are 
more than 200Mb large each; the flatmapper will emit millions of record to the 
sink given one source record (filename).
The rebalancing, occurring at the file name level, does not use much I/O and I 
cannot use one-to-one mode at that point if I want some parallelims since I 
have only one source.
I did not put file decoding directly in the sources because I have no good way 
to distribute files to sources without a controller (input directory is unique, 
filenames are random and cannot be “attributed” to one particular source 
instance easily). 
Alternatively, I could have used a dispatcher daemon separated from the 
streaming app that distribute files to various directories, each directory 
being associated with a flink source instance, and put the file reading & 
decoding directly in the source, but that seemed more complex to code and 
exploit than the filename source. Would it have been better from the 
checkpointing perspective?
About the ungraceful source sleep(), is there a way, programmatically, to know 
the “load” of the app, or to determine if checkpointing takes too much time, so 
that I can do it only on purpose?
Thanks,
Arnaud
De : zhijiang  
Envoyé : vendredi 1 mars 2019 04:59
À : user ; LINZ, Arnaud 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
Thanks for the further feedbacks!
For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.
For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.
For option3: If the resource is limited, it is still not working on your side.
It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.
I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all connectio

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.

For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all connection. The barrier alignment takes more time in rebalance mode 
than forward mode.

Best,
Zhijiang
--
From:LINZ, Arnaud 
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang ; user 
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)


Update :
Option  1 does not work. It still fails at the end of the timeout, no matter 
its value.
Should I implement a “bandwidth” management system by using artificial 
Thread.sleep in the source depending on the back pressure ? 
De : LINZ, Arnaud 
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' ; user 
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
Hi Zhihiang,
Thanks for your feedback.
I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will 
let you know. Setting it higher than 40 min does not make much sense since 
after 40 min the pending output is already quite large.
Option 3 won’t work ; I already take too many ressources, and as my source is 
more or less a hdfs directory listing, it will always be far faster than any 
mapper that reads the file and emits records based on its content or sink that 
store the transformed data, unless I put “sleeps” in it (but is this really a 
good idea?)
Option 2: taskmanager.network.memory.buffers-per-channel and 
taskmanager.network.memory.buffers-per-gate are currently unset in my 
configuration (so to their default of 2 and 8), but for this streaming app I 
have very few exchanges between nodes (just a rebalance after the source that 
emit file names, everything else is local to the node). Should I adjust their 
values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang  
Envoyé : jeudi 28 février 2019 10:58
À : user ; LINZ, Arnaud 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
I think there are two key points. First the checkpoint barrier might be emitted 
delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the 
downstream task has to process all the buffers before barriers to trigger 
checkpoint and this would take some time under back pressure.
There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting 
buffers before barrier, you can check the config of  
"taskmanager.network.memory.buffers-per-channel" and 
"taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to 
process source data faster, to avoid backpressure in some extent.
You could check which way is suitable for your scenario and may have a try.
Best,
Zhijiang
--
From:LINZ, Arnaud 
Send Time:2019年2月28日(星期四) 17:28
To:user 
Subject:Checkpoints and catch-up burst (heavy back pressure)
Hello,
I have a simple streaming app that get data from a source and store it to HDFS 
using a sink similar to the bucketing file sink. Checkpointing mode is “exactly 
once”.
Everything is fine on a “normal” course as the sink is faster than the source; 
but when we stop the application for a while and then restart it, we have a 
catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the s

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul,

Thanks for your feedback. If the at-least-once mode still causes the problem, 
we can confirm it is not caused by blocking behavior in exactly-once-mode 
mentioned before.

For at-least once, the task would continue processing the buffers following 
with barriers during allignment. But for exactly-once, the task would block the 
channel after reading barrier during allignment. It is difficult to confirm 
based on the obsolute state size which is also related to network buffer 
setting and parallelism.

I think your problem is once activing checkpoint, the backpressure is also 
caused sometimes which results in low performance and cpu usages. 
Maybe we can analyze which vertex and subtask cause the backpressure and then 
traces its jstack to check which operations slow down it. The source vertex is 
blocked in `requestingBufferBuilder` once backpressure. But I am not sure it is 
caused by middle vertex or the last sink vertex. If the middle vertex is also 
blocked by requesting buffer as you mentioned in first email, then the 
backpressure should be caused by last sink vertex. But the jstack of last sink 
task should not be in `getNextBuffer` or you did not trace all the sink tasks 
parallelism.  

My suggestion is first to check which vertex causes the backpressure (middle or 
last sink vertex?). And then trace the jstack of proper parallelism task in 
this vertex, you can select the task with the largest inqueue length. I think 
we might find something if seeing which operation delays the task to cause the 
backpressure, and this operation might be involved with HDFS. :)

Best,
Zhijiang


--
From:Paul Lam 
Send Time:2019年2月28日(星期四) 19:17
To:zhijiang 
Cc:user 
Subject:Re: Flink performance drops when async checkpoint is slow

Hi Zhijiang,

Thanks a lot for your reasoning! 

I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily 
the problem remains the same :(

IMHO, if it’s caused by barrier alignment, the state size (mainly buffers 
during alignment) would be big, right? But actually it’s not, so we didn’t 
think that way before.

Best,
Paul Lam

在 2019年2月28日,16:12,zhijiang  写道:
Hi Paul,

I am not sure whether task thread is involverd in some works during snapshoting 
states for FsStateBackend. But I have another experience which might also cause 
your problem.
From your descriptions below, the last task is blocked by 
`SingleInputGate.getNextBufferOrEvent` that means the middle task does not have 
any outpus or the middle operator does not process records.
The backpressure is high between source and middle task which results in 
blocking the source task in `requestBufferBuilder`.

Based on above two points, I guess the middle task is waiting for barrier from 
some source tasks. For the input channels which already receives the barriers, 
the middle task would not process the following data buffers and just cache 
them, so it would result in backpressure the corresponding source based on 
credit-based flow control.  For the input channels without barriers, if there 
are also no data buffers, then the middle task would not have any outputs. So I 
think one hint is to trace why some source task emits barrier delay.

In order to double check the above analysis, you can change the checkpoint mode 
from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not 
decreased for a period as before, I think we could confirm the above analysis. 
:)

Best,
Zhijiang
--
From:Paul Lam 
Send Time:2019年2月28日(星期四) 15:17
To:user 
Subject:Flink performance drops when async checkpoint is slow

Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
transformations and aggregates, and write to two Kafka topics respectively. 
Meanwhile, there’s a custom source that pulls configurations for the 
transformations periodically. The generic job graph is as below.

<屏幕快照 2019-02-25 11.24.54.png>

The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
unstable, and sometimes HDFS client reports slow read and slow 
waitForAckedSeqno during checkpoints. When that happens, the Flink job consume 
rate drops significantly, and some taskmanager’ cpu usage drops from about 140% 
to 1%, all the task threads on that taskmanager are blocked. This situation 
lasts from seconds to a minute. We started a parallel job with everything the 
same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task 
threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task 
right after it is blocked at requesting memory buffer (with back pressure close 
to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during ali

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud,

I think there are two key points. First the checkpoint barrier might be emitted 
delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the 
downstream task has to process all the buffers before barriers to trigger 
checkpoint and this would take some time under back pressure.

There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting 
buffers before barrier, you can check the config of  
"taskmanager.network.memory.buffers-per-channel" and 
"taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to 
process source data faster, to avoid backpressure in some extent.

You could check which way is suitable for your scenario and may have a try.

Best,
Zhijiang
--
From:LINZ, Arnaud 
Send Time:2019年2月28日(星期四) 17:28
To:user 
Subject:Checkpoints and catch-up burst (heavy back pressure)


Hello,
I have a simple streaming app that get data from a source and store it to HDFS 
using a sink similar to the bucketing file sink. Checkpointing mode is “exactly 
once”.
Everything is fine on a “normal” course as the sink is faster than the source; 
but when we stop the application for a while and then restart it, we have a 
catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail 
(time out) until the source has been totally caught up. This is annoying 
because the sink does not “commit” the data before a successful checkpoint is 
made, and so the app release all the “catch up” data as a atomic block that can 
be huge if the streaming app was stopped for a while, adding an unwanted stress 
to all the following hive treatments that use the data provided in micro 
batches and to the Hadoop cluster.
How should I handle the situation? Is there something special to do to get 
checkpoints even during heavy load?
The problem does not seem to be new, but I was unable to find any practical 
solution in the documentation.
Best regards,
Arnaud


 L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The 
company that sent this message cannot therefore be held liable for its content 
nor attachments. Any unauthorized use or dissemination is prohibited. If you 
are not the intended recipient of this message, then please delete it and 
notify the sender.



Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul,

I am not sure whether task thread is involverd in some works during snapshoting 
states for FsStateBackend. But I have another experience which might also cause 
your problem.
From your descriptions below, the last task is blocked by 
`SingleInputGate.getNextBufferOrEvent` that means the middle task does not have 
any outpus or the middle operator does not process records.
The backpressure is high between source and middle task which results in 
blocking the source task in `requestBufferBuilder`.

Based on above two points, I guess the middle task is waiting for barrier from 
some source tasks. For the input channels which already receives the barriers, 
the middle task would not process the following data buffers and just cache 
them, so it would result in backpressure the corresponding source based on 
credit-based flow control.  For the input channels without barriers, if there 
are also no data buffers, then the middle task would not have any outputs. So I 
think one hint is to trace why some source task emits barrier delay.

In order to double check the above analysis, you can change the checkpoint mode 
from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not 
decreased for a period as before, I think we could confirm the above analysis. 
:)

Best,
Zhijiang
--
From:Paul Lam 
Send Time:2019年2月28日(星期四) 15:17
To:user 
Subject:Flink performance drops when async checkpoint is slow

Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
transformations and aggregates, and write to two Kafka topics respectively. 
Meanwhile, there’s a custom source that pulls configurations for the 
transformations periodically. The generic job graph is as below.



The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
unstable, and sometimes HDFS client reports slow read and slow 
waitForAckedSeqno during checkpoints. When that happens, the Flink job consume 
rate drops significantly, and some taskmanager’ cpu usage drops from about 140% 
to 1%, all the task threads on that taskmanager are blocked. This situation 
lasts from seconds to a minute. We started a parallel job with everything the 
same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task 
threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task 
right after it is blocked at requesting memory buffer (with back pressure close 
to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during alignment is less than 10 MB, even 
when back pressure is high.

We’ve been struggling with this problem for weeks, and any help is appreciated. 
Thanks a lot!

Best,
Paul Lam



屏幕快照 2019-02-25 11.24.54.png
Description: Binary data


Re: Confusion in Heartbeat configurations

2019-02-18 Thread zhijiang
Hi sohimankotia,

In order not to strongly rely on the akka implementation, flink implements the 
heartbeat mechanism for health monitor for the components of TaskExecutor, 
JobMaster and ResourceManager from FLIP6. So you can see two sets of heartbeat 
setting, one is for akka internal implementation prefix with `akka` and the 
other is flink internal implementation.

Best,
Zhijiang
--
From:sohimankotia 
Send Time:2019年2月18日(星期一) 14:40
To:user 
Subject:Confusion in Heartbeat configurations

Hi, 

In
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html
link there are two heartbeat config are mentioned . 

akka.watch.heartbeat.interval
akka.watch.heartbeat.pause

Vs

heartbeat.interval
heartbeat.timeout


Can u guys pls explain what exactly is difference between them and which
component of job execution graph they impact . 

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread zhijiang
Thanks Stephan for this proposal and I totally agree with it. 

It is very necessary to summarize the overall features/directions the community 
is going or planning to go. Although I almost checked the mailing list 
everyday, it still seems difficult to trace everything. In addtion I think this 
whole roadmap picture can also help expose the relationships among different 
items, even avoid the similar/duplicated thoughts or works.

Just one small suggestion, if we coule add some existing link 
(jira/discussion/FLIP/google doc) for each listed item, then it would be easy 
to keep trace of the interested one and handle the progress of it.

Best,
Zhijiang
--
From:Jeff Zhang 
Send Time:2019年2月14日(星期四) 18:03
To:Stephan Ewen 
Cc:dev ; user ; jincheng sun 
; Shuyi Chen ; Rong Rong 

Subject:Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

Hi Stephan,

Thanks for this proposal. It is a good idea to track the roadmap. One
suggestion is that it might be better to put it into wiki page first.
Because it is easier to update the roadmap on wiki compared to on flink web
site. And I guess we may need to update the roadmap very often at the
beginning as there's so many discussions and proposals in community
recently. We can move it into flink web site later when we feel it could be
nailed down.

Stephan Ewen  于2019年2月14日周四 下午5:44写道:

> Thanks Jincheng and Rong Rong!
>
> I am not deciding a roadmap and making a call on what features should be
> developed or not. I was only collecting broader issues that are already
> happening or have an active FLIP/design discussion plus committer support.
>
> Do we have that for the suggested issues as well? If yes , we can add them
> (can you point me to the issue/mail-thread), if not, let's try and move the
> discussion forward and add them to the roadmap overview then.
>
> Best,
> Stephan
>
>
> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>
>> Thanks Stephan for the great proposal.
>>
>> This would not only be beneficial for new users but also for contributors
>> to keep track on all upcoming features.
>>
>> I think that better window operator support can also be separately group
>> into its own category, as they affects both future DataStream API and batch
>> stream unification.
>> can we also include:
>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>> - Improving sliding window operator [1]
>>
>> One more additional suggestion, can we also include a more extendable
>> security module [2,3] @shuyi and I are currently working on?
>> This will significantly improve the usability for Flink in corporate
>> environments where proprietary or 3rd-party security integration is needed.
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
>> wrote:
>>
>>> Very excited and thank you for launching such a great discussion,
>>> Stephan !
>>>
>>> Here only a little suggestion that in the Batch Streaming Unification
>>> section, do we need to add an item:
>>>
>>> - Same window operators on bounded/unbounded Table API and DataStream
>>> API
>>> (currently OVER window only exists in SQL/TableAPI, DataStream API does
>>> not yet support)
>>>
>>> Best,
>>> Jincheng
>>>
>>> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>>>
>>>> Hi all!
>>>>
>>>> Recently several contributors, committers, and users asked about making
>>>> it more visible in which way the project is currently going.
>>>>
>>>> Users and developers can track the direction by following the
>>>> discussion threads and JIRA, but due to the mass of discussions and open
>>>> issues, it is very hard to get a good overall picture.
>>>> Especially for new users and contributors, is is very hard to get a
>>>> quick overview of the project direction.
>>>>
>>>> To fix this, I suggest to add a brief roadmap summary to the homepage.
>>>> It is a bit of a commitment to keep that roadmap up to date, but I think
>>>> the benefit for users justifies that.
>>>> The Apach

Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread zhijiang
Congrats Thomas!

Best,
Zhijiang
--
From:Kostas Kloudas 
Send Time:2019年2月12日(星期二) 22:46
To:Jark Wu 
Cc:Hequn Cheng ; Stefan Richter 
; user 
Subject:Re: [ANNOUNCE] New Flink PMC member Thomas Weise

Congratulations Thomas!

Best,
Kostas
On Tue, Feb 12, 2019 at 12:39 PM Jark Wu  wrote:
Congrats Thomas!
On Tue, 12 Feb 2019 at 18:58, Hequn Cheng  wrote:
Congrats Thomas!

Best, Hequn


On Tue, Feb 12, 2019 at 6:53 PM Stefan Richter  
wrote:
Congrats Thomas!,

Best,
Stefan

Am 12.02.2019 um 11:20 schrieb Stephen Connolly 
:
Congratulations to Thomas. I see that this is not his first time in the PMC 
rodeo... also somebody needs to update LDAP as he's not on 
https://people.apache.org/phonebook.html?pmc=flink yet!

-stephenc
On Tue, 12 Feb 2019 at 09:59, Fabian Hueske  wrote:
Hi everyone,

On behalf of the Flink PMC I am happy to announce Thomas Weise as a new member 
of the Apache Flink PMC.

Thomas is a long time contributor and member of our community. 
He is starting and participating in lots of discussions on our mailing lists, 
working on topics that are of joint interest of Flink and Beam, and giving 
talks on Flink at many events.

Please join me in welcoming and congratulating Thomas!

Best,
Fabian



-- 
Kostas Kloudas | Software Engineer


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen 



Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread zhijiang
Hi Wenrui,

I suspect another issue which might cause connection failure. You can check 
whether the netty server already binds and listens port successfully in time 
before the client requests connection. If there exists some time-consuming 
process during TM startup which might delay netty server start, so when the 
client requests connection, the server is not ready which may cause connection 
timeout or failure.

From your description, it seems exist in only some TM. Because when you 
decrease the total parallel, it might miss the problem TM and does not cause 
this issue. The default number of netty thread and timeout should make sense 
for normal cases.

Best,
Zhijiang


--
From:Wenrui Meng 
Send Time:2019年1月9日(星期三) 18:18
To:Till Rohrmann 
Cc:user ; Konstantin 
Subject:Re: ConnectTimeoutException when createPartitionRequestClient

Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping 
the connected host from connecting host. It seems very stable. For the 
connection timeout, I do set it as 20min but it still report the timeout after 
2 minutes. Could you let me know how do you test locally about the timeout 
setting?

Thanks,
Wenrui
On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure 
whether this is so different. Could it be that your network is overloaded or 
not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till
On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease 
the total parallel the timeout issue can be avoided. But we do need that amount 
of taskManagers to process data. In addition, once I increase the netty server 
threads to 128, the error is changed to to following error. It seems the cause 
is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate 
that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has 
failed. This might indicate that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424

Re: Buffer stats when Back Pressure is high

2019-01-07 Thread zhijiang
Hi Gagan,

What flink version do you use? And have you checked the 
buffers.inputQueueLength for all the related parallelism (connected with A) of 
B?  It may exist the scenario that only one parallelim B is full of inqueue 
buffers which back pressure A, and the input queue for other parallelism B is 
empty.

Best,
Zhijiang


--
From:Gagan Agrawal 
Send Time:2019年1月7日(星期一) 12:06
To:user 
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating 
that downstream operator is performing slow when Back Pressure is high? Say I 
have A -> B operators and A shows High Back Pressure which indicates something 
wrong or not performing well on B side which is slowing down operator A. 
However when I look at buffers.inputQueueLength for operator B, it's 0. My 
understanding is that when B is processing slow, it's input buffer will be full 
of incoming messages which ultimately blocks/slows down upstream operator A. 
However it doesn't seem to be happening in my case. Can someone throw some 
light on how should different stats around buffers (e.g buffers.inPoolUsage, 
buffers.inputQueueLength, numBuffersInLocalPerSecond, 
numBuffersInRemotePerSecond) look like when downstream operator is performing 
slow?

Gagan



回复:buffer pool is destroyed

2018-12-21 Thread zhijiang
Hi Shuang,

Normally this exception you mentioned is not the root cause of failover, and it 
is mainly caused  by cancel process to make task exit.
You can further check whether there are other failures in job master log to 
find the root cause.

Best,
Zhijiang
--
发件人:Chan, Shuang 
发送时间:2018年12月21日(星期五) 11:12
收件人:user@flink.apache.org 
主 题:buffer pool is destroyed


Hi Flink community,
I have a custom source that emits an user-defined data type, BaseEvent.  The 
following code works fine when BaseEvent is not POJO.
But, when I changed it to POJO by adding a default constructor, I’m getting 
“Buffer Pool is destroyed” runtime exception on the Collect method.
DataStream eventStream = see.addSource(new 
AgoraSource(configFile, instance));
DataStream> result_order = 
eventStream
.filter(e -> e instanceof OrderEvent)
.map(e -> (OrderEvent)e)
.map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), 
Double.valueOf(e.OriginalQuantity))).returns(info_tuple3)
.keyBy(e -> e.f0)
.timeWindow(Time.seconds(5))
.reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + 
b.f2))
.map(e -> new Tuple4<>(e.f0, e.f1, e.f2, 
"Order")).returns(info_tuple4);
Any idea?
Shuang


==
Please access the attached hyperlink for an important electronic communications 
disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==



回复:回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-26 Thread zhijiang
It may work aournd by increasing the task manager memory size.

The recover failure is up to serveral issues, whether it had successful 
checkpoint before, the states are available  and what is the failover strategy?

Best,
Zhijiang
--
发件人:Flink Developer 
发送时间:2018年11月26日(星期一) 16:37
收件人:Flink Developer 
抄 送:zhijiang ; user ; Gagan 
Agrawal 
主 题:Re: 回复:Flink job failing due to "Container is running beyond physical 
memory limits" error.

Also, after the Flink job has failed from the above error, the Flink job is 
unable to recover from previous checkpoint. Is this the expected behavior? How 
can the job be recovered successfully from this?


‐‐‐ Original Message ‐‐‐
 On Monday, November 26, 2018 12:35 AM, Flink Developer 
 wrote:

I am also experiencing this error message "Container is running beyond physical 
memory limits". In my case, I am using Flink 1.5.2 with 10 task managers, with 
40 slots for each task manager. The memory assigned during flink cluster 
creation is 1024MB per task manager. The checkpoint is using RocksDb and the 
checkpoint size is very small (10MB).

Is the simply solution to increase the Task Manager memory size? I will try 
from 1024MB to 4096MB per task manager.

‐‐‐ Original Message ‐‐‐
On Sunday, November 25, 2018 7:58 PM, zhijiang  
wrote:

I think it is probably related with rockdb memory usage if you have not found 
OutOfMemory issue before.

There already existed a jira ticket [1] for fixing this issue, and you can 
watch it for updates. :)

[1] https://issues.apache.org/jira/browse/FLINK-10884

Best,
Zhijiang
--
发件人:Gagan Agrawal 
发送时间:2018年11月24日(星期六) 14:14
收件人:user 
主 题:Flink job failing due to "Container is running beyond physical memory 
limits" error.

Hi,
I am running flink job on yarn where it ran fine so far (4-5 days) and have now 
started failing with following errors.

2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1542008917197_0038_01_06 because: Container 
[pid=18380,containerID=container_1542008917197_0038_01_06] is running 
beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory 
used; 5.0 GB of 15 GB virtual memory used. Killing container.

This is simple job where we are reading 2 Avro streams from Kafka and applying 
some custom UDF after creating keyed stream from union on those 2 streams and 
writing back output to Kafka. Udf internally uses Map State with RocksDB 
backend. Currently size of checkpoint is around 300 GB and we are running this 
with 10 task manager with 3 GB memory each. I have also set 
"containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink 
version is 1.6.2

Here is the flink command
./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar

I want to understand what are typical reasons for this issue? Also why would 
flink consume more memory than allocated as JVM memory is fixed and will not 
grow beyond max heap. Can this be something related to RocksDB where it may be 
consuming memory outside heap and hence over using defined limits? I didn't 
find this issue when checkpoint size was small (<50 GB). But ever since we are 
now at 300GB size, this issue is coming frequently. I can try increasing 
memory, but I am still interested in knowing what are typical reasons for this 
error if Jvm heap memory can not grow beyond defined limit.

Gagan








回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-25 Thread zhijiang
I think it is probably related with rockdb memory usage if you have not found 
OutOfMemory issue before.

There already existed a jira ticket [1] for fixing this issue, and you can 
watch it for updates. :)

[1] https://issues.apache.org/jira/browse/FLINK-10884

Best,
Zhijiang
--
发件人:Gagan Agrawal 
发送时间:2018年11月24日(星期六) 14:14
收件人:user 
主 题:Flink job failing due to "Container is running beyond physical memory 
limits" error.

Hi,
I am running flink job on yarn where it ran fine so far (4-5 days) and have now 
started failing with following errors.

2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1542008917197_0038_01_06 because: Container 
[pid=18380,containerID=container_1542008917197_0038_01_06] is running 
beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory 
used; 5.0 GB of 15 GB virtual memory used. Killing container.

This is simple job where we are reading 2 Avro streams from Kafka and applying 
some custom UDF after creating keyed stream from union on those 2 streams and 
writing back output to Kafka. Udf internally uses Map State with RocksDB 
backend. Currently size of checkpoint is around 300 GB and we are running this 
with 10 task manager with 3 GB memory each. I have also set 
"containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink 
version is 1.6.2

Here is the flink command
./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar

I want to understand what are typical reasons for this issue? Also why would 
flink consume more memory than allocated as JVM memory is fixed and will not 
grow beyond max heap. Can this be something related to RocksDB where it may be 
consuming memory outside heap and hence over using defined limits? I didn't 
find this issue when checkpoint size was small (<50 GB). But ever since we are 
now at 300GB size, this issue is coming frequently. I can try increasing 
memory, but I am still interested in knowing what are typical reasons for this 
error if Jvm heap memory can not grow beyond defined limit.

Gagan





回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

Sorrry I have not thought of a proper way to handle single large record in 
distributed task managers in flink. But I can give some hints for adjusting the 
related memories for work around OOM issue.
Large fraction of memories in task manager are managed by flink for efficiency, 
and these memories are long live persistent in JVM not recycled by gc. You can 
check the parameter "taskmanager.memory.fraction" for this and the default 
value is 0.7 if you have not changed, that means 7GB * 0.7 are used by 
framework.

I am not sure what is the flink version you used. If I rememberd correctly, 
before release-1.5 the network buffers also uses heap memories by default, so 
you should also minus this part of memory from total task manager memory.

If not considering network buffer used by framework, you only leave 7GB * 0.3 
temporaray memories for other parts. The temporaray memories in serializer will 
exceed twice as current size every time if not covering the record size, that 
means one serializer may need 2GB overhead memories for your 1GB record. You 
have 2 slots per task manager for running two tasks, so the total overhead 
memories may need 4GB almost. So you can decrease the 
"taskmanager.memory.fraction" in low fraction or increase the total task 
manager to cover this overhead memories, or set one slot for each task manager. 

Best,
Zhijiang


--
发件人:Akshay Mendole 
发送时间:2018年11月23日(星期五) 02:54
收件人:trohrmann 
抄 送:zhijiang ; user ; 
Shreesha Madogaran 
主 题:Re: OutOfMemoryError while doing join operation in flink

Hi,
Thanks for your reply. I tried running a simple "group by" on just one 
dataset where few keys are repeatedly occurring (in order of millions)  and did 
not include any joins. I wanted to see if this issue is specific to join. But 
as I was expecting, I ran into the same issue. I am giving 7GBs to each task 
manager with 2 slots per task manager. From what I understood so far, such 
cases where individual records somewhere in the pipeline become so large that 
they should be handled in distributed manner instead of handling them by a 
simple data structure in single JVM. I am guessing there is no way to do this 
in Flink today. 
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  wrote:
Hi Akshay,

Flink currently does not support to automatically distribute hot keys across 
different JVMs. What you can do is to adapt the parallelism/number of 
partitions manually if you encounter that one partition contains a lot of hot 
keys. This might mitigate the problem by partitioning the hot keys into 
different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your join 
result is quite large. One record is 1 GB large. Try to decrease it or give 
more memory to your TMs.

Cheers,
Till
On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole  wrote:
Hi Zhijiang,
 Thanks for the quick reply. My concern is more towards how 
flink perform joins of two skewed datasets. Pig and spark seems to support the 
join of skewed datasets. The record size that you are mentioning about in your 
reply is after join operation takes place which is definitely going to be huge 
enough not to fit in jvm task manager task slot in my use case. We want to know 
if there is a way in flink to handle such skewed keys by distributing their 
values across different jvms. Let me know if you need more clarity on the issue.
Thanks, 
Akshay 
On Thu, Nov 22, 2018 at 2:38 PM zhijiang  wrote:
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we co

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we converted the pig pipeline to apache 
beam and ran it using flink on a production yarn cluster, we got the following 
error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
(25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed 
to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap 
space
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at 
org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 
1136656562 bytes) exceeds JVM heap space
at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCod

回复:checkpoint/taskmanager is stuck, deadlock on LocalBufferPool

2018-10-23 Thread zhijiang
From the stack below, it indicates there are no available buffers for source 
outputs including watermark and normal records, so the source will be blocked 
on request buffer from LocalBufferPool.
The checkpoint process is also affected by above blocking request. The root 
cause is why the queued output buffers are not consumed by downstream tasks.
 I think you can check the downstream task which inqueue usage should reach 
100%, then jstack the corresponding downstream tasks that may stuck in some 
operations to cause back pressure.

Best,
Zhijiang
--
发件人:Yan Zhou [FDS Science] 
发送时间:2018年10月23日(星期二) 02:29
收件人:user@flink.apache.org 
主 题:Re: checkpoint/taskmanager is stuck, deadlock on LocalBufferPool


 I am using flink 1.5.3 
From: Yan Zhou [FDS Science] 
Sent: Monday, October 22, 2018 11:26
To: user@flink.apache.org
Subject: checkpoint/taskmanager is stuck, deadlock on LocalBufferPool
 Hi,
 My application suddenly stuck and completely doesn't move forward after 
running for a few days. No exceptions are found. From the thread dump, I can 
see that the operator threads and checkpoint threads deadlock on 
LocalBufferPool.  LocalBufferPool is not able to request memory and keep the 
lock. Please see the thread dump at the bottom. 

It uses rocksdb as statebackend. From the heap dump and web ui, there are 
plenty of memory in jvm and it doesn't have GC problem. Check points were good 
until there was the problem:

2018-10-19 04:41:23,691 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 4347 @ 1539891683667 for job 1.
2018-10-19 04:41:45,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 4347 for job 1 (1019729450 bytes in 13600 ms).
2018-10-19 04:46:45,089 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 4348 @ 1539892005069 for job 1.
2018-10-19 04:56:45,089 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 4348 
of job 1 expired before completing.


This happened at mid night and the traffic was relatively low. Even if there 
was a spike and caused a back pressure, to my understand that the events should 
be processed  eventually and the network buffer would be available after that. 
What might be the cause of it? 


 Best
 Yan

"Time Trigger for Source: Custom Source -> Flat Map -> Flat Map -> 
Timestamps/Watermarks -> (from: ...s#363 daemon prio=5 os_prio=0 
tid=0x7ff187944000 nid=0x8f76 in Object.wait() [0x7ff12fda9000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
- locked <0x0006dadeeac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:117)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:87)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:736)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:736)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:603)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:77)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeSer

回复:Need help to understand memory consumption

2018-10-17 Thread Zhijiang(wangzhijiang999)
The operators for stream jobs will not use memory management which is only for 
batch jobs as you said.
I guess the initial feedback is for batch jobs from the description?
--
发件人:Paul Lam 
发送时间:2018年10月17日(星期三) 14:35
收件人:Zhijiang(wangzhijiang999) 
抄 送:jpreisner ; user 
主 题:Re: Need help to understand memory consumption

Hi Zhijiang,

Does the memory management apply to streaming jobs as well? A previous post[1] 
said that it can only be used in batch API, but I might miss some updates on 
that. Thank you!

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

Best,
Paul Lam

在 2018年10月17日,13:39,Zhijiang(wangzhijiang999)  写道:
Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for 
caching data efficiently, just as you mentioned in this article 
"https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html;. 
These managed memories are persistent resident and referenced by the 
MemoryManager once allocated, so they will be resident in old region of JVM and 
will not be recycled by gc. To do so, wecan aovid the costs of creating and 
recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means 
these managed memories will not be allocated during starting TaskManager. When 
the job is running, the related tasks would request these managed memories and 
then you will see the memory consumption is high. When the job is cancelled, 
these managed memories will be released to the MemoryManager but not recycled 
by gc, so you will see no changes in memory consumption. After you restart the 
TaskManager, the initial memory consumption is low because of lazy allocating 
via taskmanager.memory.preallocate=false.

Best,
Zhijiang
--
发件人:Paul Lam 
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner 
抄 送:user 
主 题:Re: Need help to understand memory consumption


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC 
to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 
> TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> connector.
> - Depending on a special event, a job can self-restart via the command : 
> bin/flink cancel 
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and 
> this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous 
> job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> triggers and windows, ...
> 
> Thanks in advance,
> Julien





回复:Need help to understand memory consumption

2018-10-16 Thread Zhijiang(wangzhijiang999)
Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for 
caching data efficiently, just as you mentioned in this article 
"https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html;. 
These managed memories are persistent resident and referenced by the 
MemoryManager once allocated, so they will be resident in old region of JVM and 
will not be recycled by gc. To do so, wecan aovid the costs of creating and 
recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means 
these managed memories will not be allocated during starting TaskManager. When 
the job is running, the related tasks would request these managed memories and 
then you will see the memory consumption is high. When the job is cancelled, 
these managed memories will be released to the MemoryManager but not recycled 
by gc, so you will see no changes in memory consumption. After you restart the 
TaskManager, the initial memory consumption is low because of lazy allocating 
via taskmanager.memory.preallocate=false.

Best,
Zhijiang
--
发件人:Paul Lam 
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner 
抄 送:user 
主 题:Re: Need help to understand memory consumption


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC 
to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 
> TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> connector.
> - Depending on a special event, a job can self-restart via the command : 
> bin/flink cancel 
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and 
> this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous 
> job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> triggers and windows, ...
> 
> Thanks in advance,
> Julien



回复:What are channels mapped to?

2018-10-11 Thread Zhijiang(wangzhijiang999)
The channels are mapped to the subpartition index which would be consumed by 
specific downstream task parallelism.

For example, if there are three reduce tasks parallelism, every map task would 
generate three subpartitions. If one record is hashed to the first channel, 
that means this record will be consumed by the first reduce task.

Best,
Zhijiang
--
发件人:Chris Miller 
发送时间:2018年10月11日(星期四) 16:54
收件人:user 
主 题:What are channels mapped to?

Hi,
in the OutputEmitter, the output channel can be selected in different manner. 
eg. OutputEmitter#hashPartitionDefault()
What are the channels mapped to? Do they map to one IP Address or Port?
Thanks.
Chris



回复:Small checkpoint data takes too much time

2018-10-09 Thread Zhijiang(wangzhijiang999)
The checkpoint duration includes the processes of barrier alignment and state 
snapshot. Every task has to receive all the barriers from all the channels, 
then trriger to snapshot state.
I guess the barrier alignment may take long time for your case, and it is 
specially critical during backpressure. You can check the metric of 
"checkpointAlignmentTime" for confirmation.

Best,
Zhijiang
--
发件人:徐涛 
发送时间:2018年10月10日(星期三) 13:13
收件人:user 
主 题:Small checkpoint data takes too much time

Hi 
 I recently encounter a problem in production. I found checkpoint takes too 
much time, although it doesn`t affect the job execution.
 I am using FsStateBackend, writing the data to a HDFS checkpointDataUri, and 
asynchronousSnapshots, I print the metric data “lastCheckpointDuration” and 
“lastCheckpointSize”. It shows the “lastCheckpointSize” is about 80KB, but the 
“lastCheckpointDuration” is about 160s! Because checkpoint data is small , I 
think it should not take that long time. I do not know why and which condition 
may influent the checkpoint time. Does anyone has encounter such problem?
 Thanks a lot.

Best
Henry



回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2018-10-08 Thread Zhijiang(wangzhijiang999)
There actually exists this deadlock for special scenarios.

Before fixing the bug, we can avoid this issue by not deploying the map and 
sink tasks in the same task manager to work around.
Krishna, do you share the slot for these two tasks? If so, you can set disable 
slot sharing for this job.

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate 
blocking result partition to avoid this issue temporarily.

Best,
Zhijiang


--
发件人:Piotr Nowojski 
发送时间:2018年10月4日(星期四) 21:54
收件人:Aljoscha Krettek 
抄 送:"Narayanaswamy, Krishna" ; Nico Kruber 
; user@flink.apache.org 
主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when 
running a large job > 10k tasks

Hi,

Thanks for reporting the problem. This bug was previously unknown to us. I have 
created a jira ticket for this bug:
https://issues.apache.org/jira/browse/FLINK-10491

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t 
know if there is some hot fix or anything that can at least mitigate/decrease 
the probability of the bug for you until we fix it properly. 

Piotrek

On 4 Oct 2018, at 13:55, Aljoscha Krettek  wrote:
Hi,

this looks like a potential Flink bug. Looping in Nico and Piotr who have 
looked into that in the past. Could you please comment on that?

Best,
Aljoscha

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna  
wrote:

Hi,
I am trying to run one large single job graph which has > 10k tasks. The form 
of the graph is something like
DataSource -> Filter -> Map [...multiple]
Sink1
Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory 
allocation of 32G per TM. The JM is running with 8G.
Everything starts up and runs fine with close to 6-7k tasks (this is variable 
and is mostly the source /filter/map portions) completing and then the graph 
just hangs.  I managed to connect to the task managers and get a thread dump 
just in time and found the following deadlock on one of the TMs which 
apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or 
try out to fix this.
Marked below are the 2 isolated thread stacks marking the deadlock -
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
waiting for monitor entry
 waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
java.util.ArrayDeque)
  at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
  at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
  at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
   - locked <0x2dfd> (a java.util.ArrayDeque)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
  - locked <0x2da5> (a java.lang.Object)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
  at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
  at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
  at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
  at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
  at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:745)
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
entry
  java.lang.Thread.State: BLOCKED
 blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
 waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
lock on <0x2dfd> (a java.util.ArrayDeque)
  at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
  at 
org.apache.flink.run

回复:[DISCUSS] Dropping flink-storm?

2018-09-28 Thread Zhijiang(wangzhijiang999)
Very agree with to drop it. +1
--
发件人:Jeff Carter 
发送时间:2018年9月29日(星期六) 10:18
收件人:dev 
抄 送:chesnay ; Till Rohrmann ; user 

主 题:Re: [DISCUSS] Dropping flink-storm?

+1 to drop it.

On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng  wrote:

> Hi,
>
> +1 to drop it. It seems that few people use it.
>
> Best, Hequn
>
> On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler 
> wrote:
>
> > I'm very much in favor of dropping it.
> >
> > Flink has been continually growing in terms of features, and IMO we've
> > reached the point where we should cull some of the more obscure ones.
> > flink-storm, while interesting from a theoretical standpoint, offers too
> > little value.
> >
> > Note that the bolt/spout wrapper parts of the part are still compatible,
> > it's only topologies that aren't working.
> >
> > IMO compatibility layers only add value if they ease the migration to
> > Flink APIs.
> > * bolt/spout wrappers do this, but they will continue to work even if we
> > drop it
> > * topologies don't do this, so I'm not interested in then.
> >
> > On 28.09.2018 15:22, Till Rohrmann wrote:
> > > Hi everyone,
> > >
> > > I would like to discuss how to proceed with Flink's storm
> > > compatibility layer flink-strom.
> > >
> > > While working on removing Flink's legacy mode, I noticed that some
> > > parts of flink-storm rely on the legacy Flink client. In fact, at the
> > > moment flink-storm does not work together with Flink's new distributed
> > > architecture.
> > >
> > > I'm also wondering how many people are actually using Flink's Storm
> > > compatibility layer and whether it would be worth porting it.
> > >
> > > I see two options how to proceed:
> > >
> > > 1) Commit to maintain flink-storm and port it to Flink's new
> architecture
> > > 2) Drop flink-storm
> > >
> > > I doubt that we can contribute it to Apache Bahir [1], because once we
> > > remove the legacy mode, this module will no longer work with all newer
> > > Flink versions.
> > >
> > > Therefore, I would like to hear your opinion on this and in particular
> > > if you are using or planning to use flink-storm in the future.
> > >
> > > [1] https://github.com/apache/bahir-flink
> > >
> > > Cheers,
> > > Till
> >
> >
> >
>



回复:InpoolUsage & InpoolBuffers inconsistence

2018-09-18 Thread Zhijiang(wangzhijiang999)
Hi,

The inpoolQueueLength indicates hown many buffers are received and queued. But 
if the buffers in the queue are the events (like barrier), it will not be 
calculated in the inpoolUsage.
So in your case it may be normal for these two metrics. If you monitored that 
the autoread=false in downstream side, that means the inpoolUsage may already 
reach 100% and there are no available buffers to receive data from upstream 
side. But if the upstream still has available buffers to output, the upstream 
will not be blocked in this case.
BTW, if  the autoread=false and the inpoolUsage reaches 100%, there may be a 
lot of buffers queued in front of barrier, so the checkpoint may expire as you 
said.

Best,
Zhijiang
--
发件人:aitozi 
发送时间:2018年9月18日(星期二) 12:59
收件人:user 
主 题:Re: InpoolUsage & InpoolBuffers inconsistence

And my doubt for that comes from the debug of problem of checkpoint
expiration.
I encountered the checkpoint expiration with no backpressure shown in web
ui. But after i add many log, i found that the barrier send to the
downstream, And the downstream may be set to autoread = false , and block
the consume of the barrier. But temporary in inputchannel do not cause the
upstream backpressure.

I think this situation can be monitored by check the inpoolUsage metric,
when it is 1, it may have some problem. But when i check the inpoolUsage and
inpoolQueueLength, I found the inconsistent problem. Although the
inpoolUsage is calculated by bestEffotGetUsedBuffer / allbuffers, Is this
lead to the mistake ? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



回复:Flink application down due to RpcTimeout exception

2018-09-13 Thread Zhijiang(wangzhijiang999)
Hi,

1.  This rpc timeout occurs during JobMaster deploying task into TaskExecutor. 
The rpc thread in TaskExecutor does not respond the deployment message within 
10 seconds. There are many possibilities to cause this issue, such as network 
problem between TaskExecutor and JobMaster or other time-consuming operators in 
TaskExecutor. The root cause may be a bit complicated for tracing. First you 
can debug when the TaskExecutor receives this message, then you can check when 
the TaskExecutor responses this message, and may also need check what is the 
rpc thread doing during these times.

2.   You can increase the default value of rpc timeout 
parameter(akka.ask.timeout) to work around temporarily.

Best,
Zhijiang
--
发件人:徐涛 
发送时间:2018年9月13日(星期四) 14:10
收件人:user 
主 题:Flink application down due to RpcTimeout exception

Hi All,
 I`m running flink1.6 on yarn,after the program run for a day, the flink 
program fails on yarn, and the error log is as follows:
 It seems that it is due to a timeout error. But I have the following questions:
 1. In which step the flink components communicate failed? What are the two 
components? 
 2. How to solve this problem?
 Thanks a lot!!

java.lang.Exception: Cannot deploy task LeftOuterJoin(where: (=(id, 
article_id)), join: (id, created_time, article_score, PU, article_id, CU, CN)) 
-> select: (id, created_time, article_score, PU, CU, CN) (2/2) 
(d403002a7accc5133cf89a386ddc1dfb) - TaskManager 
(container_1532509321420_463249_01_02 @ sh-bs-3-i1-hadoop-17-225 
(dataPort=10459)) not responding after a rpcTimeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:601)
 ~[flink-runtime_2.11-1.6.0.jar:1.6.0]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[na:1.8.0_65]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[na:1.8.0_65]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 ~[na:1.8.0_65]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[na:1.8.0_65]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[na:1.8.0_65]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[na:1.8.0_65]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
~[na:1.8.0_65]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
~[na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@sh-bs-3-i1-hadoop-17-225:24213/user/taskmanager_0#-1762816591]]
 after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) 
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 ~[scala-library-2.11.8.jar:na]
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
~[scala-library-2.11.8.jar:na]
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
~[scala-library-2.11.8.jar:na]
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
 ~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
 ~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
 ~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
 ~[akka-actor_2.11-2.4.20.jar:na]
... 1 common frames omitted


Best,
Henry



  1   2   >