QiuYucheng2003 opened a new issue, #25135: URL: https://github.com/apache/pulsar/issues/25135
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment Broker version: Master Java version: Java8 ### Issue Description The FileSource connector (org.apache.pulsar.io.file.FileSource) initializes internal blocking queues (workQueue, inProcess, recentlyProcessed) using the default new LinkedBlockingQueue<>() constructor. Internal queues should have a bounded capacity to provide backpressure. This prevents memory exhaustion when the producer (file listing) creates tasks faster than the consumer (file processing) can handle them. The default constructor sets the capacity to Integer.MAX_VALUE. If the FileListingThread scans files significantly faster than the FileConsumerThread can process them (e.g., large files or slow I/O), the workQueue will grow indefinitely, eventually leading to a java.lang.OutOfMemoryError. This is a classic "Unbounded Size Cache Queue" (UBSCQ) pattern. It lacks protection against traffic spikes or slow consumption rates. ### Error messages ```text java.lang.OutOfMemoryError: Java heap space at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at org.apache.pulsar.io.file.FileListingThread.run(FileListingThread.java:...) ``` ### Reproducing the issue 1. Configure a FileSource connector to monitor a directory. 2. Generate a scenario where the source directory contains a massive number of files (high production rate). 3. Ensure the file processing logic is slower than the disk listing speed (simulation of backpressure accumulation). 4. Monitor the JVM Heap memory; the workQueue object size will grow until OOM occurs. Relevant code: Location: pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java // Line 39-41: Queues are unbounded private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>(); private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>(); private final BlockingQueue<File> recentlyProcessed = new LinkedBlockingQueue<>(); // Line 49: ThreadPool is also unbounded executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2); ### Additional information Suggestion for fix: Replace the unbounded initialization with a configurable or fixed capacity to enable blocking behavior (backpressure) when the queue is full. // Example fix private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>(10000); This ensures the FileListingThread blocks when the consumer cannot keep up, preventing OOM. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
