[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592182#comment-16592182
 ] 

Luka Jurukovski commented on FLINK-10195:
-----------------------------------------

[~StephanEwen]
For what I can tell no. Although this has been very much a crash course in 
RabbitMQ for me. Looking at forums it looks like the prefetch.count is the way 
that this is handled normally. Basically the consumer can tell RabbitMQ how 
many unacked messages to allow before stopping. Ie if the prefetch.count is set 
to 10,000 that is how many messages rabbitmq will allow before it needs 
acknowledgement, at which point it will send data until it hits this max.

I would imagine that Flink would not want to use this mechanism due to the fact 
that it doesn't actually "backpressure" with how Checkpointing is tied to 
Acking. One would have to do a throughput calculation and hope that there isn't 
any variance in that number that results in Flink waiting on the next 
checkpoint. Additionally since sync checkpointing is a feature there is no 
guarantees that checkpointing will happen at a regular interval.

Under the covers the Queueing consumer is using LinkedBlockingQueue and uses 
the "add" method to append to the queue.

I tried changing it to use ArrayBlockingQueue with a set capacity and the 
blocking "put" method, however this results in another problem with RabbitMQ. 
Basically this results in RabbitMQ sometimes terminating the connection to 
Flink when Flink doesn't dequeue from the queue fast enough (noticing this 
usually happens only when sync checkpointing is on and it there is long running 
checkpoints). According to some of the forums this is due to Rabbit having some 
sort of timeout with regards to how long it is willing to wait when writing to 
a clients buffer.

I have some ugly code that I am testing where I turn off the consumer when the 
buffer is full, and a monitoring thread that turns it back on when it is below 
a certain capacity. Don't know if this methodology will cause any other issues, 
and am testing more. I might be able to get rid of the monitoring thread but 
I'll look into that when I proved out this way of doing things.

Welcoming any additional thoughts or comments here. Sorry for the wall of text

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -----------------------------------------------------------------
>
>                 Key: FLINK-10195
>                 URL: https://issues.apache.org/jira/browse/FLINK-10195
>             Project: Flink
>          Issue Type: Bug
>          Components: RabbitMQ Connector
>    Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>            Reporter: Luka Jurukovski
>            Priority: Major
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to