Thanks Zexian for driving this work. Rate limiting is a common requirement, TBH, we should have supported it in earlier stage, and the proposed design integrating it into the source operator lifecycle, it is already able to meet the vast majority of scenarios, looks good from my side.
Best, Leonard > 2025 7月 18 12:01,Zexian WU <wzx3122351...@gmail.com> 写道: > > Hi everyone, > I would like to start a discussion on a new Flink Improvement Proposal > (FLIP), FLIP-535: Introduce RateLimiter to Source. > The full proposal can be found on the wiki: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > Motivation > > In many production environments, Flink sources read from shared external > systems (like Kafka, Pulsar, or databases) where resources are limited. An > uncontrolled data ingestion rate can lead to critical issues: > Resource Contention: A high-throughput Flink job can saturate a shared Kafka > cluster's bandwidth, starving other essential services. > System Instability: Aggressive polling from a database (e.g., MySQL) can > overwhelm its IOPS, degrading performance for transactional queries. > This proposal aims to provide a built-in, precise, and easy-to-use > rate-limiting mechanism for Flink Sources to ensure system stability and fair > resource sharing. > Proposed Solution > > The core idea is to integrate a flexible, record-based rate-limiting > mechanism directly into the SourceReaderBase, making it available to all > connectors built on the new source interface. > Key Changes: > Seamless Integration via SourceReaderBase: > New constructors accepting a RateLimiterStrategy will be added directly to > SourceReaderBase. This allows any connector built on the modern source > interface (like Kafka or Pulsar) to enable rate limiting with minimal code > changes. > Accurate, Post-Emission Throttling: > To support sources with unpredictable batch sizes (e.g., Kafka), rate > limiting is applied after records are emitted. The reader counts the records > after each recordEmitter.emitRecord method call, counts the records, and only > then consults the RateLimiter. This ensures throttling is based on the > precise number of records processed. > Fully Non-Blocking Operation: > The entire mechanism is asynchronous and non-blocking. If a rate limit is > hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This yields > control to the Flink task's event loop without blocking the processing > thread, ensuring that critical operations like checkpointing are never > delayed. > Unified Configuration via SQL/Table API: > Users can configure rate limits consistently across different sources using a > simple SQL table option, such as WITH ('scan.rate.limit' = '1000'). This > provides a unified and user-friendly experience for pipeline tuning. > The design is backward-compatible, and existing custom sources will continue > to work without any modification. > I believe this feature will be a valuable addition for Flink users operating > in complex, multi-tenant environments. I'm looking forward to your feedback, > suggestions, and any potential concerns you might have. > Thanks, > Zexian Wu