Praveenkumar76 opened a new pull request, #28:
URL: https://github.com/apache/pulsar-connectors/pull/28

   Fixes [apache#25135](https://github.com/apache/pulsar/issues/25135)
   
   ### Motivation
   
   The `FileSource` connector currently initializes its internal queues 
(`workQueue`, `inProcess`, and `recentlyProcessed`) using unbounded 
`LinkedBlockingQueue` instances.
   
   Under heavy workloads, the `FileListingThread` can scan and enqueue files 
significantly faster than the worker threads can process them. Since the queues 
have no capacity limits, pending file tasks accumulate indefinitely in memory, 
eventually resulting in a fatal `java.lang.OutOfMemoryError`.
   
   This creates a classic unbounded producer-consumer backpressure issue where 
the producer rate is unconstrained by downstream processing throughput.
   
   The issue was reproduced locally by:
   
   - Running the connector with a constrained JVM heap
   - Generating 500,000 dummy files
   - Slowing file processing relative to directory scanning
   
   This consistently triggered heap exhaustion during file listing.
   
   
   ### Modifications
   
   #### Queue Backpressure
   
   - Added `maxQueueSize` configuration to `FileSourceConfig`
   - Introduced a bounded queue capacity with a default value of `1000`
   - Updated `FileSource` to initialize:
     - `workQueue`
     - `inProcess`
     - `recentlyProcessed`
     
     using bounded `LinkedBlockingQueue` instances
   
   This ensures the file listing thread naturally blocks when downstream 
processing cannot keep up, preventing unbounded memory growth.
   
   #### Configuration Validation
   
   - Added validation to reject invalid queue sizes (`<= 0`)
   - Added configuration tests to verify:
     - default values
     - custom values
     - invalid configuration handling
   
   #### Stability Improvements
   
   - Introduced proper backpressure behavior between:
     - `FileListingThread`
     - file processing workers
   
   - Prevents uncontrolled queue accumulation during:
     - massive directory scans
     - slow I/O
     - large file processing workloads
   
   
   ### Verifying this change
   
   Verified locally using:
   
   - Apache Pulsar FileSource connector
   - Restricted heap size (`-Xmx64m`)
   - Large-scale directory generation (`500,000` files)
   
   **Manual Verification:**
   
   * Simulated the environment locally using the Pulsar FileSource connector 
with a restricted heap size (`-Xmx64m`) and a large-scale directory generation 
(`500,000` files).
   
   - Before this change:
     - heap usage continuously increased
     - connector terminated with `java.lang.OutOfMemoryError`
   
   - After this change:
     - memory usage remained stable
     - queue growth was bounded
     - file listing naturally paused under load due to backpressure
   
   
   ### Tests added
   
   - Added `FileSourceConfigTest` coverage for:
     - default `maxQueueSize`
     - custom queue size configuration
     - invalid / valid values (`<= 0`)
   
   - Confirmed existing FileSource behavior remains unchanged under normal 
workloads
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [x] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to