Flink 1.16.0 搜索到社区有相关文章,其中的实例如下: https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/#rationale-behind-the-ratelimitingstrategy-interface
public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy { private final Bucket bucket; public TokenBucketRateLimitingStrategy() { Refill refill = Refill.intervally(1, Duration.ofSeconds(1)); Bandwidth limit = Bandwidth.classic(10, refill); this.bucket = Bucket4j.builder() .addLimit(limit) .build(); } // ... (information methods not needed) @Override public boolean shouldBlock(RequestInfo requestInfo) { return bucket.tryConsume(requestInfo.getBatchSize()); } } 我但是这个shouldblock的返回值似乎是反的,我实际使用后发现会发现异步的线程池的队列会很快被打满,抛出RejectedExecutionException。