This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 94ba5e14f8 [ISSUE #10031] Add PreprocessHandler interface in
AllocateMappedFileService
94ba5e14f8 is described below
commit 94ba5e14f8c5706f0adc7f04be5e243cde7929d1
Author: guyinyou <[email protected]>
AuthorDate: Wed Jan 21 10:07:51 2026 +0800
[ISSUE #10031] Add PreprocessHandler interface in AllocateMappedFileService
Change-Id: I4e81916a79f89c095ffb7b860c8ccd49e88c76ea
Co-authored-by: guyinyou <[email protected]>
---
.../rocketmq/store/AllocateMappedFileService.java | 34 ++++++++++++++++++++++
1 file changed, 34 insertions(+)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 7664e284ec..85042fdbc9 100644
---
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -45,12 +45,31 @@ public class AllocateMappedFileService extends
ServiceThread {
new PriorityBlockingQueue<>();
private volatile boolean hasException = false;
private DefaultMessageStore messageStore;
+ private PreprocessHandler preprocessHandler;
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
+ /**
+ * Set preprocess handler for external extension
+ *
+ * @param preprocessHandler the preprocess handler
+ */
+ public void setPreprocessHandler(PreprocessHandler preprocessHandler) {
+ this.preprocessHandler = preprocessHandler;
+ }
+
public MappedFile putRequestAndReturnMappedFile(String nextFilePath,
String nextNextFilePath, int fileSize) {
+ // Execute preprocess logic if handler is set
+ final PreprocessHandler finalPreprocessHandler =
this.preprocessHandler;
+ if (finalPreprocessHandler != null) {
+ try {
+ finalPreprocessHandler.preprocess(nextFilePath,
nextNextFilePath, fileSize);
+ } catch (Throwable t) {
+ log.warn("Preprocess handler in AllocateMappedFileService
execution failed", t);
+ }
+ }
int canSubmitRequests = 2;
if (this.messageStore.isTransientStorePoolEnable()) {
if
(this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
@@ -230,6 +249,21 @@ public class AllocateMappedFileService extends
ServiceThread {
return true;
}
+ /**
+ * Preprocess handler interface for external extension
+ */
+ @FunctionalInterface
+ public interface PreprocessHandler {
+ /**
+ * Preprocess before allocating mapped file
+ *
+ * @param nextFilePath the next file path
+ * @param nextNextFilePath the next next file path
+ * @param fileSize the file size
+ */
+ void preprocess(String nextFilePath, String nextNextFilePath, int
fileSize);
+ }
+
static class AllocateRequest implements Comparable<AllocateRequest> {
// Full file path
private String filePath;