Flink 1.16.0

搜索到社区有相关文章,其中的实例如下:
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/#rationale-behind-the-ratelimitingstrategy-interface


public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {

    

    private final Bucket bucket;



    public TokenBucketRateLimitingStrategy() {

        Refill refill = Refill.intervally(1, Duration.ofSeconds(1));

        Bandwidth limit = Bandwidth.classic(10, refill);

        this.bucket = Bucket4j.builder()

            .addLimit(limit)

            .build();

    }

    

    // ... (information methods not needed)

    

    @Override

    public boolean shouldBlock(RequestInfo requestInfo) {

        return bucket.tryConsume(requestInfo.getBatchSize());

    }

    

}



我但是这个shouldblock的返回值似乎是反的,我实际使用后发现会发现异步的线程池的队列会很快被打满,抛出RejectedExecutionException。

回复