rmetzger commented on code in PR #584:
URL: https://github.com/apache/flink-web/pull/584#discussion_r1031435470


##########
_posts/2022-11-25-async-sink-rate-limiting-strategy.md:
##########
@@ -0,0 +1,195 @@
+---
+layout: post
+title: "Optimising the throughput of async sinks using a custom 
RateLimitingStrategy"
+date: 2022-11-22T12:00:00.000Z
+authors:
+- liangtl:
+  name: "Hong Liang Teoh"
+excerpt: An overview of how to optimise the throughput of async sinks using a 
custom RateLimitingStrategy
+---
+
+## Introduction
+
+When designing a Flink data processing job, one of the key concerns is 
maximising job throughput. Sink throughput is a crucial factor because the sink 
throughput can determine the entire job’s throughput. We generally want the 
highest possible write rate in the sink without overloading the destination. 
However, since the factors which impact destination performance are variable 
over the job’s lifetime, the sink needs to adjust this write rate dynamically. 
Depending on the destination being written to, we might want to tune the write 
rate differently using a different RateLimitingStrategy.
+
+**This post explains how you can configure a custom RateLimitingStrategy on a 
connector that builds on the [AsyncSinkBase 
(FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)
 to optimise sink throughput.** In the sections below, we first cover the 
design logic behind the AsyncSinkBase and the RateLimitingStrategy, then we 
take you through two example implementations of rate limiting strategies, 
specifically the CongestionControlRateLimitingStrategy and 
TokenBucketRateLimitingStrategy.
+
+### Background of the AsyncSinkBase
+
+When implementing the AsyncSinkBase, our main goal was to simplify building 
new async sinks to custom destinations by providing common functionalities used 
in building async sinks with at least once processing. This has allowed users 
to more easily write sinks to custom destinations, such as Amazon Kinesis Data 
Streams and Amazon Kinesis Firehose. An additional async sink to Amazon 
DynamoDB 
([FLIP-252](https://cwiki.apache.org/confluence/display/FLINK/FLIP-252%3A+Amazon+DynamoDB+Sink+Connector))
 is also being developed at the time of writing.
+
+The AsyncSinkBase offers the core implementation which handles the mechanics 
of async requests and responses. This includes retrying of failed messages, 
deciding when to flush records to the destination, and persisting un-flushed 
records to state during checkpointing. In order to aid throughput, the async 
sink also dynamically adjusts the request rate depending on the responses 
returned from the destination. Read more about this in our [previous 1.15 
release blog post](https://flink.apache.org/2022/05/06/async-sink-base.html) or 
watch our [FlinkForward talk recording explaining the design of the Async 
Sink](https://www.youtube.com/watch?v=z-hYuLgbHuo).
+
+### Configuring the AsyncSinkBase
+
+When designing the AsyncSinkBase, we wanted users to be able to tune their 
custom connector implementations based on their specific use case and needs, 
without having to understand the low-level workings of the base sink itself.
+
+So, as part of our initial implementation in Flink 1.15, we exposed 
configurations such as `maxBatchSize`, `maxInFlightRequests`, 
`maxBufferedRequests`, `maxBatchSizeInBytes`, `maxTimeInBufferMS` and 
`maxRecordSizeInBytes` so that users can change the flushing and writing 
behaviour of the sink.
+
+In Flink 1.16, we have further extended this configurability to the 
RateLimitingStrategy used by the AsyncSinkBase 
([FLIP-242](https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+RateLimitingStrategy+for+Async+Sink)).
 With this change, users can now customise how the AsyncSinkBase dynamically 
adjusts the request rate in real-time to optimise throughput whilst mitigating 
back pressure. Example customisations would include changing the mathematical 
function used to scale the request rate, implementing a cool off period between 
rate adjustments, or implementing a token bucket RateLimitingStrategy.
+
+## Rationale behind the RateLimitingStrategy interface
+
+```java
+public interface RateLimitingStrategy {
+
+   // Information provided to the RateLimitingStrategy
+    void registerInFlightRequest(RequestInfo requestInfo);
+    void registerCompletedRequest(ResultInfo resultInfo);
+    
+    // Controls offered to the RateLimitingStrategy
+    boolean shouldBlock(RequestInfo requestInfo);
+    int getMaxBatchSize();
+}
+```
+
+There are 2 core ideas behind the RateLimitingStrategy interface:
+
+* **Information methods:** We need methods to provide the RateLimitingStrategy 
with sufficient information to track the rate of requests being made or rate of 
messages being sent (each request can have more than one message)
+* **Control methods:** We also need methods to allow the RateLimitingStrategy 
to control the request rate of the sink.
+
+These are the type of methods that we see in the RateLimitingStrategy 
interface. With `registerInFlightRequest()` and `registerCompletedRequest()`, 
the RateLimitingStrategy has sufficient information to track the number of 
in-flight requests, number of in-flight messages as well as the rate of these 
requests.
+
+With `shouldBlock()`, the RateLimitingStrategy can decide to postpone any new 
requests until a specified condition is met (e.g. current in-flight requests 
must not exceed a given number). This allows the RateLimitingStrategy to 
control the rate of requests being made to the destination, in order to 
increase throughput, or increase backpressure in the Flink job graph.
+
+With `getMaxBatchSize()`, the RateLimitingStrategy can dynamically adjust the 
number of messages packaged into a single request. This might be useful to 
optimise sink throughput if the request size affects the performance of the 
destination.
+
+## Implementing a custom RateLimitingStrategy
+
+### [Example 1]  CongestionControlRateLimitingStrategy
+
+The AsyncSinkBase comes pre-packaged with the 
CongestionControlRateLimitingStrategy. In this section, we explore its 
implementation.
+
+This strategy is modelled after [TCP congestion 
control](https://en.wikipedia.org/wiki/TCP_congestion_control), and aims to 
discover the highest possible request rate for the given destination. It 
achieves this by increasing the request rate until the sink gets throttled by 
the destination, at which point it will reduce the request rate.
+
+In this RateLimitingStrategy, we want to dynamically adjust the request rate 
by doing the following:
+
+* Set a maximum number of in-flight requests at any time
+* Set a maximum number of in-flight messages at any time (each request can 
have multiple messages)
+* When there is a successful request, we increase the maximum number of 
in-flight messages to maximise the request rate
+* When there is an unsuccessful request, we decrease the maximum number of 
in-flight messages to prevent overloading the destination
+* If there are multiple sink subtasks, each can keep track of their maximum 
number of in-flight messages independently
+
+This strategy means what we will start with a low request rate (slow start), 
but aggressively increase our request rate until the destination throttles us, 
which allows us to discover the highest possible request rate. It will also 
adjust the request rate if the conditions of the destination changes (e.g. 
another client starts writing to the same destination). This strategy works 
well if the destination implements traffic shaping and throttles once the 
bandwidth limit is reached (e.g. Amazon Kinesis Data Streams, Amazon Kinesis 
Data Firehose).
+
+First, we implement the information methods to keep track of the number of 
in-flight requests and in-flight messages.
+
+```java
+public class CongestionControlRateLimitingStrategy implements 
RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.getBatchSize();
+    }
+    
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        currentInFlightRequests = Math.max(0, currentInFlightRequests - 1);
+        currentInFlightMessages = Math.max(0, currentInFlightMessages - 
resultInfo.getBatchSize());
+        
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = 
scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    // ...
+}
+```
+
+Then we implement the control methods to dynamically adjust the request rate.
+
+We keep a current value for maxInFlightMessages and maxInFlightRequests, and 
postpone all new requests if maxInFlightRequests or maxInFlightMessages have 
been reached.
+
+Every time a request completes, the CongestionControlRateLimitingStrategy will 
check if there are any failed messages in the response. If there are, it will 
decrease maxInFlightMessages. If there are no failed messages, it will increase 
maxInFlightMessages. This gives us an indirect control of rate of messages 
being written to the destination.
+
+Side note: The default CongestionControlRateLimitingStrategy uses an Additive 
Increase / Multiplicative Decrease (AIMD) scaling strategy. This is also used 
in TCP congestion control to avoid overloading the destination by increasing 
write rate slowly but backing off quickly if throttled.
+
+```java
+public class CongestionControlRateLimitingStrategy implements 
RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        // ...
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = 
scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    
+    public boolean shouldBlock(RequestInfo requestInfo) {
+        return currentInFlightRequests >= maxInFlightRequests
+                || (currentInFlightMessages + requestInfo.getBatchSize() > 
maxInFlightMessages);
+    }
+    // ...
+}
+```
+
+### [Example 2] TokenBucketRateLimitingStrategy
+
+The CongestionControlRateLimitingStrategy is rather aggressive, and relies on 
a robust server-side rate limiting strategy. In the event we don’t have a 
robust server-side rate limiting strategy, we can implement a client-side rate 
limiting strategy.
+
+As an example, we can look at the [token bucket rate limiting 
strategy](https://en.wikipedia.org/wiki/Token_bucket). If the limits are set 
correctly, we will avoid overloading the destination altogether.
+
+In this strategy, we want to do the following:
+
+* Implement a TokenBucket that has a given initial number of tokens (e.g. 10). 
These tokens refill at a given rate (e.g. 1 token per second).
+* When preparing an async request, we check if the token bucket has sufficient 
tokens. If not, we postpone the request.
+
+Let’s look at an example implementation:
+
+```java
+public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {
+    
+    private final Bucket bucket;
+
+    public TokenBucketRateLimitingStrategy() {
+        Refill refill = Refill.intervally(1, Duration.ofSeconds(1));
+        Bandwidth limit = Bandwidth.classic(10, refill);
+        this.bucket = Bucket4j.builder()
+            .addLimit(limit)
+            .build();
+    }
+    
+    // ... (information methods not needed)
+    
+    @Override
+    public boolean shouldBlock(RequestInfo requestInfo) {
+        return bucket.tryConsume(requestInfo.getBatchSize());
+    }
+}
+```
+
+In the above example, we use the Token Bucket implementation in the 
[Bucket4j](https://github.com/bucket4j/bucket4j) library. We also map 1 message 
to 1 token. Since our token bucket has a size of 10 tokens and a refill rate of 
1 token per second, we can be sure that we will not exceed a burst of 10 
messages, and will also not exceed a constant rate of 1 message per second.
+

Review Comment:
   I wonder if it makes sense to quickly summarize the characteristics of the 
strategy? It sound like this strategy really restricts the throughput to a 
certain rate.



##########
_posts/2022-11-25-async-sink-rate-limiting-strategy.md:
##########
@@ -0,0 +1,195 @@
+---
+layout: post
+title: "Optimising the throughput of async sinks using a custom 
RateLimitingStrategy"
+date: 2022-11-22T12:00:00.000Z
+authors:
+- liangtl:
+  name: "Hong Liang Teoh"
+excerpt: An overview of how to optimise the throughput of async sinks using a 
custom RateLimitingStrategy
+---
+
+## Introduction
+
+When designing a Flink data processing job, one of the key concerns is 
maximising job throughput. Sink throughput is a crucial factor because the sink 
throughput can determine the entire job’s throughput. We generally want the 
highest possible write rate in the sink without overloading the destination. 
However, since the factors which impact destination performance are variable 
over the job’s lifetime, the sink needs to adjust this write rate dynamically. 
Depending on the destination being written to, we might want to tune the write 
rate differently using a different RateLimitingStrategy.
+
+**This post explains how you can configure a custom RateLimitingStrategy on a 
connector that builds on the [AsyncSinkBase 
(FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)
 to optimise sink throughput.** In the sections below, we first cover the 
design logic behind the AsyncSinkBase and the RateLimitingStrategy, then we 
take you through two example implementations of rate limiting strategies, 
specifically the CongestionControlRateLimitingStrategy and 
TokenBucketRateLimitingStrategy.
+
+### Background of the AsyncSinkBase
+
+When implementing the AsyncSinkBase, our main goal was to simplify building 
new async sinks to custom destinations by providing common functionalities used 
in building async sinks with at least once processing. This has allowed users 
to more easily write sinks to custom destinations, such as Amazon Kinesis Data 
Streams and Amazon Kinesis Firehose. An additional async sink to Amazon 
DynamoDB 
([FLIP-252](https://cwiki.apache.org/confluence/display/FLINK/FLIP-252%3A+Amazon+DynamoDB+Sink+Connector))
 is also being developed at the time of writing.
+
+The AsyncSinkBase offers the core implementation which handles the mechanics 
of async requests and responses. This includes retrying of failed messages, 
deciding when to flush records to the destination, and persisting un-flushed 
records to state during checkpointing. In order to aid throughput, the async 
sink also dynamically adjusts the request rate depending on the responses 
returned from the destination. Read more about this in our [previous 1.15 
release blog post](https://flink.apache.org/2022/05/06/async-sink-base.html) or 
watch our [FlinkForward talk recording explaining the design of the Async 
Sink](https://www.youtube.com/watch?v=z-hYuLgbHuo).
+
+### Configuring the AsyncSinkBase
+
+When designing the AsyncSinkBase, we wanted users to be able to tune their 
custom connector implementations based on their specific use case and needs, 
without having to understand the low-level workings of the base sink itself.
+
+So, as part of our initial implementation in Flink 1.15, we exposed 
configurations such as `maxBatchSize`, `maxInFlightRequests`, 
`maxBufferedRequests`, `maxBatchSizeInBytes`, `maxTimeInBufferMS` and 
`maxRecordSizeInBytes` so that users can change the flushing and writing 
behaviour of the sink.
+
+In Flink 1.16, we have further extended this configurability to the 
RateLimitingStrategy used by the AsyncSinkBase 
([FLIP-242](https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+RateLimitingStrategy+for+Async+Sink)).
 With this change, users can now customise how the AsyncSinkBase dynamically 
adjusts the request rate in real-time to optimise throughput whilst mitigating 
back pressure. Example customisations would include changing the mathematical 
function used to scale the request rate, implementing a cool off period between 
rate adjustments, or implementing a token bucket RateLimitingStrategy.
+
+## Rationale behind the RateLimitingStrategy interface
+
+```java
+public interface RateLimitingStrategy {
+
+   // Information provided to the RateLimitingStrategy
+    void registerInFlightRequest(RequestInfo requestInfo);
+    void registerCompletedRequest(ResultInfo resultInfo);
+    
+    // Controls offered to the RateLimitingStrategy
+    boolean shouldBlock(RequestInfo requestInfo);
+    int getMaxBatchSize();
+}
+```
+
+There are 2 core ideas behind the RateLimitingStrategy interface:
+
+* **Information methods:** We need methods to provide the RateLimitingStrategy 
with sufficient information to track the rate of requests being made or rate of 
messages being sent (each request can have more than one message)
+* **Control methods:** We also need methods to allow the RateLimitingStrategy 
to control the request rate of the sink.
+
+These are the type of methods that we see in the RateLimitingStrategy 
interface. With `registerInFlightRequest()` and `registerCompletedRequest()`, 
the RateLimitingStrategy has sufficient information to track the number of 
in-flight requests, number of in-flight messages as well as the rate of these 
requests.
+
+With `shouldBlock()`, the RateLimitingStrategy can decide to postpone any new 
requests until a specified condition is met (e.g. current in-flight requests 
must not exceed a given number). This allows the RateLimitingStrategy to 
control the rate of requests being made to the destination, in order to 
increase throughput, or increase backpressure in the Flink job graph.
+
+With `getMaxBatchSize()`, the RateLimitingStrategy can dynamically adjust the 
number of messages packaged into a single request. This might be useful to 
optimise sink throughput if the request size affects the performance of the 
destination.
+
+## Implementing a custom RateLimitingStrategy
+
+### [Example 1]  CongestionControlRateLimitingStrategy
+
+The AsyncSinkBase comes pre-packaged with the 
CongestionControlRateLimitingStrategy. In this section, we explore its 
implementation.
+
+This strategy is modelled after [TCP congestion 
control](https://en.wikipedia.org/wiki/TCP_congestion_control), and aims to 
discover the highest possible request rate for the given destination. It 
achieves this by increasing the request rate until the sink gets throttled by 
the destination, at which point it will reduce the request rate.
+
+In this RateLimitingStrategy, we want to dynamically adjust the request rate 
by doing the following:
+
+* Set a maximum number of in-flight requests at any time
+* Set a maximum number of in-flight messages at any time (each request can 
have multiple messages)
+* When there is a successful request, we increase the maximum number of 
in-flight messages to maximise the request rate
+* When there is an unsuccessful request, we decrease the maximum number of 
in-flight messages to prevent overloading the destination
+* If there are multiple sink subtasks, each can keep track of their maximum 
number of in-flight messages independently
+
+This strategy means what we will start with a low request rate (slow start), 
but aggressively increase our request rate until the destination throttles us, 
which allows us to discover the highest possible request rate. It will also 
adjust the request rate if the conditions of the destination changes (e.g. 
another client starts writing to the same destination). This strategy works 
well if the destination implements traffic shaping and throttles once the 
bandwidth limit is reached (e.g. Amazon Kinesis Data Streams, Amazon Kinesis 
Data Firehose).
+
+First, we implement the information methods to keep track of the number of 
in-flight requests and in-flight messages.
+
+```java
+public class CongestionControlRateLimitingStrategy implements 
RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.getBatchSize();
+    }
+    
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        currentInFlightRequests = Math.max(0, currentInFlightRequests - 1);
+        currentInFlightMessages = Math.max(0, currentInFlightMessages - 
resultInfo.getBatchSize());
+        
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = 
scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    // ...
+}
+```
+
+Then we implement the control methods to dynamically adjust the request rate.
+
+We keep a current value for maxInFlightMessages and maxInFlightRequests, and 
postpone all new requests if maxInFlightRequests or maxInFlightMessages have 
been reached.
+
+Every time a request completes, the CongestionControlRateLimitingStrategy will 
check if there are any failed messages in the response. If there are, it will 
decrease maxInFlightMessages. If there are no failed messages, it will increase 
maxInFlightMessages. This gives us an indirect control of rate of messages 
being written to the destination.
+
+Side note: The default CongestionControlRateLimitingStrategy uses an Additive 
Increase / Multiplicative Decrease (AIMD) scaling strategy. This is also used 
in TCP congestion control to avoid overloading the destination by increasing 
write rate slowly but backing off quickly if throttled.
+
+```java
+public class CongestionControlRateLimitingStrategy implements 
RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        // ...
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = 
scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    
+    public boolean shouldBlock(RequestInfo requestInfo) {
+        return currentInFlightRequests >= maxInFlightRequests
+                || (currentInFlightMessages + requestInfo.getBatchSize() > 
maxInFlightMessages);
+    }
+    // ...
+}
+```
+
+### [Example 2] TokenBucketRateLimitingStrategy
+
+The CongestionControlRateLimitingStrategy is rather aggressive, and relies on 
a robust server-side rate limiting strategy. In the event we don’t have a 
robust server-side rate limiting strategy, we can implement a client-side rate 
limiting strategy.

Review Comment:
   Is the TokenBucketRateLimitingStrategy also part of Flink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to