Praveenkumar76 opened a new pull request, #28: URL: https://github.com/apache/pulsar-connectors/pull/28
Fixes [apache#25135](https://github.com/apache/pulsar/issues/25135) ### Motivation The `FileSource` connector currently initializes its internal queues (`workQueue`, `inProcess`, and `recentlyProcessed`) using unbounded `LinkedBlockingQueue` instances. Under heavy workloads, the `FileListingThread` can scan and enqueue files significantly faster than the worker threads can process them. Since the queues have no capacity limits, pending file tasks accumulate indefinitely in memory, eventually resulting in a fatal `java.lang.OutOfMemoryError`. This creates a classic unbounded producer-consumer backpressure issue where the producer rate is unconstrained by downstream processing throughput. The issue was reproduced locally by: - Running the connector with a constrained JVM heap - Generating 500,000 dummy files - Slowing file processing relative to directory scanning This consistently triggered heap exhaustion during file listing. ### Modifications #### Queue Backpressure - Added `maxQueueSize` configuration to `FileSourceConfig` - Introduced a bounded queue capacity with a default value of `1000` - Updated `FileSource` to initialize: - `workQueue` - `inProcess` - `recentlyProcessed` using bounded `LinkedBlockingQueue` instances This ensures the file listing thread naturally blocks when downstream processing cannot keep up, preventing unbounded memory growth. #### Configuration Validation - Added validation to reject invalid queue sizes (`<= 0`) - Added configuration tests to verify: - default values - custom values - invalid configuration handling #### Stability Improvements - Introduced proper backpressure behavior between: - `FileListingThread` - file processing workers - Prevents uncontrolled queue accumulation during: - massive directory scans - slow I/O - large file processing workloads ### Verifying this change Verified locally using: - Apache Pulsar FileSource connector - Restricted heap size (`-Xmx64m`) - Large-scale directory generation (`500,000` files) **Manual Verification:** * Simulated the environment locally using the Pulsar FileSource connector with a restricted heap size (`-Xmx64m`) and a large-scale directory generation (`500,000` files). - Before this change: - heap usage continuously increased - connector terminated with `java.lang.OutOfMemoryError` - After this change: - memory usage remained stable - queue growth was bounded - file listing naturally paused under load due to backpressure ### Tests added - Added `FileSourceConfigTest` coverage for: - default `maxQueueSize` - custom queue size configuration - invalid / valid values (`<= 0`) - Confirmed existing FileSource behavior remains unchanged under normal workloads ### Does this pull request potentially affect one of the following parts: - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [x] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
