You can refer to this document [1] for the rest API details.
Actually the backpreesure uri refers to 
"/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is 
easy to get the jobid and vertexid.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html

Best,
Zhijiang
------------------------------------------------------------------
From:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Send Time:2019 Nov. 7 (Thu.) 00:06
To:Chesnay Schepler <ches...@apache.org>
Cc:Zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org>
Subject:Re: How can I get the backpressure signals inside my function or 
operator?

If I can trigger the sample via rest API it is good for a POC. Then I can read 
from any in-memory storage using a separated thread within the operator. But 
what is the rest api that gives to me the ratio value from backpressure?

Thanks
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ches...@apache.org> wrote:

I don't think there is a truly sane way to do this. 
I could envision a separate application triggering samples via the REST API, 
writing the results into kafka which your operator can read. This is probably 
the most reasonable solution I can come up with.
Any attempt at accessing the TaskExecutor or metrics from within the operator 
are inadvisable; you'd be encroaching into truly hacky territory.
You could also do your own backpressure sampling within your operator (separate 
thread within the operator executing the same sampling logic), but I don't know 
how easy it would be to re-use Flink code.

On 06/11/2019 13:40, Felipe Gutierrez wrote:
Does anyone know in which metric I can rely on to know if a given operator is 
activating the backpressure? 
Or how can I call the same java object that the Flink UI calls to give me the 
ratio of backpressure?

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com            

On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com> 
wrote:
Hi Zhijiang, 

thanks for your reply. Yes, you understood correctly.
The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" on 
the operator might be because of the way Flink runtime architecture was 
designed. But I was wondering what kind of signal I can get. I guess some 
backpressure message I could get because backpressure works to slow down the 
upstream operators. 

For example, I can see the ratio per sub-task on the web interface [1]. It 
means the physical operators. Is there any message flowing backward that I can 
get? Is there anything that makes me able to not rely on some external storage?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com            

On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wangzhijiang...@aliyun.com> wrote:
Hi Felipe,

That is an interesting idea to control the upstream's output based on 
downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush 
output while the reduce operator is idle/hungry. In contrast, the preAggregate 
would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get 
the back pressure signal from the operator level. AIK only the upper task level 
can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of 
`Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might 
rely on some external metric reporter to query it if possible.

Best,
Zhijiang 

------------------------------------------------------------------
From:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user <user@flink.apache.org>
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

 let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> 
sink" job and the reducer is sending backpressure signals to the preAggregate, 
map and source operator. How do I get those signals inside my operator's 
implementation?
 I guess inside the function is not possible. But if I have my own operator 
implemented (preAggregate) can I get those backpressure signals?

 I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] 
on my preAggregate operator in order to decide when I stop the pre-aggregation 
and flush tuples or when I keep pre aggregating. It is something like the 
"credit based control on the network stack" [2].

 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
 [2] https://www.youtube.com/watch?v=AbqatHF3tZI

 Thanks!
 Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com  



Reply via email to