healchow opened a new issue, #12073:
URL: https://github.com/apache/inlong/issues/12073
### What happened
Currently, when querying MQ messages from multiple Pulsar clusters, there
are several issues:
1. **No thread pool management**: Message queries are executed without
proper thread pool management, which may lead to resource exhaustion under high
concurrency.
2. **No graceful handling for task rejection**: When too many concurrent
requests come in, the system doesn't properly handle the
`RejectedExecutionException` and doesn't provide a user-friendly error response.
3. **No task cancellation mechanism**: When task submission fails,
previously submitted tasks continue to run unnecessarily.
4. **No interruption support**: Long-running IO operations cannot be
cancelled when the request is aborted.
### What you expected to happen
1. **Add a dedicated thread pool**: Use `ThreadPoolTaskExecutor` with
configurable core/max pool size and queue capacity for message query tasks.
2. **Add proper error handling**:
- Add a new error code `PULSAR_QUERY_REJECTED(2610)` for task rejection
scenarios
- Return proper error code and message to the caller: `{"success": false,
"errCode": 2610, "errMsg": "Query task rejected: too many concurrent requests,
please try again later"}`
3. **Implement task cancellation**: When `RejectedExecutionException`
occurs, cancel all previously submitted tasks to free up resources.
4. **Add interruption checks**: Check
`Thread.currentThread().isInterrupted()` before and after IO operations to
support task cancellation.
### How to reproduce
If you frequently submit tasks to a multi-cluster Pulsar setup, you will
find that later-submitted tasks time out during submission. However, these
tasks remain queued and processed in the code, even though they are useless and
should be discarded.
### Environment
_No response_
### InLong version
master
### InLong Component
InLong Manager
### Are you willing to submit PR?
- [x] Yes, I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]