Hi Piotr,

Thanks for the further questions.

To answer both your questions, let's consider the different flows for 
interaction between RateLimitingStrategy and AsyncSinkWriter (IMO it is 
important since it affects the interface!): 

One potential interaction flow is:
1. AsyncSinkWriter constructs request with batch of messages
  - This can be triggered by one of 3 conditions: Timer trigger, batch byte 
size threshold, batch number size threshold.
2. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock()
3. If request starts, AsyncSinkWriter calls 
RateLimitingStrategy.startedRequest(RequestInfo)
4. When request completes, AsyncSinkWriter calls 
RateLimitingStrategy.completedRequest(RequestInfo)

## Q1: Do we need to pass in batch of messages / RequestInfo to `shouldBlock()`
My thoughts are that `shouldBlock()` needs to know whether the (currentInFlight 
+ newRequest > maxInFlight), and reject if so. An alternative to avoid passing 
this info in is to make RateLimitingStrategy only reject the request AFTER the 
(currentInFlight > maxInFlight). 
However, I don't like the idea that a sink has the ability to exceed its 
`maxInFlightMessages`, since it will not be doing what it says on the box.

## Q2: Do we need to expose nextBatchSize or something to affect the 
construction of the next batch?
If we go with never exceeding the maxInFlightMessages, we can enter a situation 
where there are no in-flight requests, and the next request's batch size is 
larger than the maxInFlightMessages.
1) We will not update RateLimitingStrategy's maxInFlightMessages, since this 
will only be called once a request is started / completed
2) We cannot guarantee the next request will have a smaller batch size, since 
the AsyncSinkWriter makes this construction independently of the 
maxInFlightMessages current set in RateLimitingStrategy.

We can close this gap by exposing `currentMaxBatchSize()` or `nextBatchSize()` 
in RateLimitingStrategy that will tell AsyncSinkWriter the maximum size of the 
next batch.

> btw, isn't this variable misnamed? (maxBufferedRequests)

Yes you are right. It would be more appropriately named maxBufferedMessages or 
maxBufferedRequestEntries. However, these names are internal, so we can rename 
it appropriately.

Regards,
Hong



On 24/06/2022, 17:07, "Piotr Nowojski" <pnowoj...@apache.org> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    Hi Hong,

    Thanks for your clarifications.

    > We can change it such that AsyncSinkWriter passes a constructed batch of
    messages to the RateLimitingStrategy.shouldBlock(), and that returns a
    boolean, so the RateLimitingStrategy can decide the evaluation logic
    internally.

    Do we need to pass the constructed batch of messages to `shouldBlock()`?
    Can we not call `startRequest()` first, then ask whether we should block?
    We could check `shouldBlock()` either after or before actually starting the
    request that we marked in `startRequest()` - it seems to me this could be
    an implementation detail. In either way, the system would behave more or
    less the same. Does it matter if we block one request later or sooner?

    > but we also have to also expose "currentInFlightMessageCapacity" from the
    RateLimitingStrategy as well

    Do we have to? Currently this is checked against
    `AsyncSinkWriter#maxBufferedRequests`, can we not keep it like that? And
    btw, isn't this variable misnamed? It suggests checking the max number of
    requests (one async request = one batch?), but from the code it looks like
    it's actually `maxBufferedMessages`?

    Best,
    Piotrek


    pt., 24 cze 2022 o 09:34 Teoh, Hong <lian...@amazon.co.uk> napisał(a):

    > Hi Piotr,
    >
    > Thanks for your feedback!
    >
    > > As I understand it, this effort is about replacing hardcoded
    >     `AIMDRateLimitingStrategy` with something more flexible?
    >
    > Yes __
    >
    >
    > > I have one main question about the design. Why are you
    >     trying to split it into three different interfaces?
    >     Can not we have a single public interface `RateLimitingStrategy`
    >
    > You're right about the intention being to separate out the (what), (when)
    > and (how).
    >
    > The intention here was to make the separation of concerns clearer, but I
    > like your idea to reduce the surface area in the interface.
    > We can change it such that AsyncSinkWriter passes a constructed batch of
    > messages to the RateLimitingStrategy.shouldBlock(), and that returns a
    > boolean, so the RateLimitingStrategy can decide the evaluation logic
    > internally. (That was my original idea too __ )
    >
    > 1. RateLimitingStrategy can update it's internal logic on `completeRequest
    > ` `startRequest` methods
    > 2. RateLimitingStrategy can provide a go/no-go decision on `shouldBlock`,
    > given a List of requests.
    >
    > The above works, but we also have to also expose
    > "currentInFlightMessageCapacity" from the RateLimitingStrategy as well
    > (since it is important the batch of messages in the proposed request be
    > constructed < currentInFlightMessageCapacity), otherwise we will end up in
    > a situation where requests will never be sent.
    >
    > An alternative is to give RateLimitingStrategy the power to construct the
    > size of the batches, I think that would be bloating the responsibility of
    > the RateLimitingStrategy a little too much. What do you think?
    >
    >
    > Regards,
    > Hong
    >
    > On 23/06/2022, 10:05, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
    >
    >     CAUTION: This email originated from outside of the organization. Do
    > not click links or open attachments unless you can confirm the sender and
    > know the content is safe.
    >
    >
    >
    >     Hi Hong,
    >
    >     As I understand it, this effort is about replacing hardcoded
    >     `AIMDRateLimitingStrategy` with something more flexible? +1 for the
    > general
    >     idea.
    >
    >     If I read it correctly, there are basically three issues:
    >     1. (what) `AIMDRateLimitingStrategy` is only able to limit the size of
    > all
    >     in-flight records across all batches, not the amount of in-flight
    > batches.
    >     2. (when) Currently `AsyncSinkWriter` decides whether and when to
    > scale up
    >     or down. You would like it to be customisable behaviour.
    >     3. (how) The actual `scaleUp()` and `scaleDown()` behaviours are
    > hardcoded,
    >     and this could be customised as well.
    >
    >     Right? Assuming so, I have one main question about the design. Why are
    > you
    >     trying to split it into three different interfaces?
    >     Can not we have a single public interface `RateLimitingStrategy`
    > instead of
    >     three that you proposed, that would have methods like:
    >
    >     `bool shouldRateLimit()` / `bool shouldBlock()`
    >     `void startedRequest(RequestInfo requestInfo)`
    >     `void completedRequest(RequestInfo requestInfo)`
    >
    >     where  `RequestInfo` is a simple POJO similar to
    > `CongestionControlInfo`
    >     that you suggested
    >
    >     public class RequestInfo {
    >       int failedMessages;
    >       int batchSize;
    >       long requestStartTime;
    >     }
    >
    >     I think it would be more flexible and at the same time a simpler 
public
    >     interface. Also we could provide the same builder that you proposed in
    >     "Example configuring the Congestion Control Strategy using the new
    >     interfaces",
    >     Or am I missing something?
    >
    >     Best Piotrek
    >
    >     pon., 20 cze 2022 o 09:52 Teoh, Hong <lian...@amazon.co.uk.invalid>
    >     napisał(a):
    >
    >     > Hi all,
    >     >
    >     > I would like to open a discussion on FLIP-242: Introduce 
configurable
    >     > CongestionControlStrategy for Async Sink.
    >     >
    >     > The Async Sink was introduced as part of FLIP-171 [1], and
    > implements a
    >     > non-configurable congestion control strategy to reduce network
    > congestion
    >     > when the destination rejects or throttles requests. We want to make
    > this
    >     > configurable, to allow the sink implementer to decide the desired
    >     > congestion control behaviour given a specific destination.
    >     >
    >     > This is a step towards reducing the barrier to entry to writing new
    > async
    >     > sinks in Flink!
    >     >
    >     > You can find more details in the FLIP-242 [2]. Looking forward to
    > your
    >     > feedback!
    >     >
    >     > [1]
    >     >
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    >     > [2]
    >     >
    > 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+CongestionControlStrategy+for+Async+Sink
    >     >
    >     >
    >     > Regards,
    >     > Hong
    >     >
    >     >
    >     >
    >
    >

Reply via email to