This is an automated email from the ASF dual-hosted git repository.

lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 61c493c068 [ISSUE #10334] Support native CqCompactionFilter with 
cross-platform JNI shim (#10335)
61c493c068 is described below

commit 61c493c068db333c268c554f5515f2ffc975a0b1
Author: lizhimins <[email protected]>
AuthorDate: Thu May 21 10:12:14 2026 +0800

    [ISSUE #10334] Support native CqCompactionFilter with cross-platform JNI 
shim (#10335)
---
 .github/workflows/maven.yaml                       |   8 +-
 common/pom.xml                                     |   4 +-
 .../common/config/AbstractRocksDBStorage.java      |  23 +-
 docs/en/Native_RocksDB.md                          | 290 ++++++++++++++++++++
 pom.xml                                            |   8 +-
 store/pom.xml                                      |   4 +
 .../ConsumeQueueCompactionFilterFactory.java       |  47 ----
 .../store/rocksdb/ConsumeQueueRocksDBStorage.java  |  72 ++++-
 .../store/rocksdb/CqCompactionFilterJni.java       | 228 ++++++++++++++++
 .../store/rocksdb/NativeCqCompactionFilter.java    |  38 +++
 .../store/rocksdb/RocksDBOptionsFactory.java       |   4 +-
 .../main/resources/native/cq_compaction_filter.cpp | 294 +++++++++++++++++++++
 .../main/resources/native/cq_compaction_filter.dll | Bin 0 -> 136192 bytes
 .../resources/native/libcq_compaction_filter.dylib | Bin 0 -> 41776 bytes
 .../resources/native/libcq_compaction_filter.so    | Bin 0 -> 23848 bytes
 .../native/libcq_compaction_filter_aarch64.so      | Bin 0 -> 74824 bytes
 .../store/rocksdb/CqCompactionFilterJniTest.java   | 184 +++++++++++++
 17 files changed, 1129 insertions(+), 75 deletions(-)

diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 4eacd65b84..aaf165420c 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -27,7 +27,13 @@ jobs:
           distribution: "corretto"
           cache: "maven"
       - name: Build with Maven
-        run: mvn -B package --file pom.xml
+        shell: bash
+        run: |
+          JACOCO_FLAG=""
+          if [ "${{ matrix.os }}" != "ubuntu-latest" ]; then
+            JACOCO_FLAG="-Djacoco.skip=true"
+          fi
+          mvn -B package --file pom.xml $JACOCO_FLAG
 
       - name: Upload Auth JVM crash logs
         if: failure()
diff --git a/common/pom.xml b/common/pom.xml
index eec7118414..b931afcb89 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -113,8 +113,8 @@
             <artifactId>rocketmq-logback-classic</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-rocksdb</artifactId>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index bc4a18006f..4875ce43e2 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -427,22 +427,35 @@ public abstract class AbstractRocksDBStorage {
         if (!hold()) {
             return;
         }
-        long s1 = System.currentTimeMillis();
+        long before = getEstimateNumKeys();
+        long startMs = System.currentTimeMillis();
         boolean result = true;
         try {
-            LOGGER.info("manualCompaction Start. {}", this.dbPath);
+            LOGGER.info("ManualCompaction started, dbPath={}, 
estimateNumKeys={}", this.dbPath, before);
             this.db.compactRange(this.defaultCFHandle, null, null, 
compactRangeOptions);
         } catch (RocksDBException e) {
             result = false;
             scheduleReloadRocksdb(e);
-            LOGGER.error("manualCompaction Failed. {}, {}", this.dbPath, 
getStatusError(e));
+            LOGGER.error("ManualCompaction failed, dbPath={}, error={}", 
this.dbPath, getStatusError(e));
         } finally {
             release();
-            LOGGER.info("manualCompaction End. {}, rt: {}(ms), result: {}", 
this.dbPath, System.currentTimeMillis() - s1, result);
+            long after = getEstimateNumKeys();
+            long elapsed = System.currentTimeMillis() - startMs;
+            String ratio = before > 0 ? String.format("%.1f", (1.0 - (double) 
after / before) * 100) : "0.0";
+            LOGGER.info("ManualCompaction finished, dbPath={}, elapsed={}ms, 
success={}, before={}, after={}, reduced={}%",
+                this.dbPath, elapsed, result, before, after, ratio);
         }
     }
 
-    protected void manualCompaction(long minPhyOffset, final 
CompactRangeOptions compactRangeOptions) {
+    private long getEstimateNumKeys() {
+        try {
+            return this.db.getLongProperty(this.defaultCFHandle, 
"rocksdb.estimate-num-keys");
+        } catch (RocksDBException e) {
+            return -1L;
+        }
+    }
+
+    protected void manualCompaction(final CompactRangeOptions 
compactRangeOptions) {
         this.manualCompactionThread.submit(new Runnable() {
             @Override
             public void run() {
diff --git a/docs/en/Native_RocksDB.md b/docs/en/Native_RocksDB.md
new file mode 100644
index 0000000000..f0d43a8137
--- /dev/null
+++ b/docs/en/Native_RocksDB.md
@@ -0,0 +1,290 @@
+# Native RocksDB ConsumeQueue Compaction Filter
+
+## Background
+
+RocketMQ previously depended on a custom-forked RocksDB Java binding published 
as `org.apache.rocketmq:rocketmq-rocksdb:1.0.6`. This fork was maintained in 
the `apache/rocketmq-externals` repository and was essentially a republished 
copy of `org.rocksdb:rocksdbjni` with exactly **one** additional class:
+
+- `org.rocksdb.RemoveConsumeQueueCompactionFilter` — a RocksDB compaction 
filter that removes stale consume queue entries during compaction. Its C++ 
implementation and JNI glue lived in the forked C++ source tree under 
`utilities/compaction_filters/remove_consumequeue_compactionfilter.*` and 
`java/rocksjni/remove_consumequeue_compactionfilterjni.cc`.
+
+All other RocketMQ subsystems using RocksDB (Pop consumption state, config 
storage, index storage, timer storage, transaction half-message storage) used 
only standard RocksDB Java APIs and had no dependency on the fork's custom code.
+
+## Problem
+
+Maintaining a fork of RocksDB's Java bindings has several costs:
+
+1. **Upgrade friction** — every RocksDB upstream release requires rebuilding 
the entire fork to pick up the new native library and Java API
+2. **Native build complexity** — the fork bundles a full C++ build pipeline 
for multiple platforms (Linux glibc/musl, macOS, Windows)
+3. **Dependency duplication** — the `rocksdb/` module in the RocketMQ source 
tree duplicates ~190 classes that are identical to upstream `rocksdbjni`
+4. **License ambiguity** — the fork republishes Facebook's RocksDB code under 
the Apache group
+
+## Solution
+
+Replace `rocketmq-rocksdb` with the official `org.rocksdb:rocksdbjni:8.4.4` 
and move the single custom compaction filter into a standalone native shim.
+
+### Why a native shim is needed
+
+The official `rocksdbjni` provides a 
`ColumnFamilyOptions.setCompactionFilter(AbstractCompactionFilter)` method, but 
its `AbstractCompactionFilter` Java class requires a native handle (raw 
`rocksdb::CompactionFilter*` pointer) passed to its constructor. The Java 
`filter()` method callback goes through a C++ trampoline that RocksDB's JNI 
layer manages internally — you can only subclass it from within the same JNI 
compilation unit.
+
+To implement a custom compaction filter outside the `rocksdbjni` build, we 
create a standalone C++ shared library that:
+- Directly subclasses `rocksdb::CompactionFilter` in C++
+- Exposes JNI methods to create filter instances and update the `minPhyOffset` 
threshold
+- Returns the raw `CompactionFilter*` pointer as a `jlong` to Java
+
+### Architecture
+
+```
+┌──────────────────────────────────────────────────────┐
+│  ConsumeQueueRocksDBStorage (Java)                   │
+│  - CqCompactionFilterJni.createAndSetFilter(cqCfOpts)│
+│  - CqCompactionFilterJni.setMinPhyOffset(offset)     │
+└──────────────────┬───────────────────────────────────┘
+                   │
+                   ▼
+┌──────────────────────────────────────────────────────┐
+│  CqCompactionFilterJni.java                          │
+│  - Extracts libcq_compaction_filter.so to the same   │
+│    temp dir as the already-loaded rocksdbjni .so     │
+│  - Uses NativeCqCompactionFilter wrapper with        │
+│    disOwnNativeHandle() + public setCompactionFilter │
+│  - Calls native createNativeFilter0() → raw pointer  │
+└──────────────────┬───────────────────────────────────┘
+                   │
+                   ▼
+┌──────────────────────────────────────────────────────┐
+│  libcq_compaction_filter.so (native shim)            │
+│                                                      │
+│  class CqCompactionFilter                            │
+│    : public rocksdb::CompactionFilter { ... }        │
+│                                                      │
+│  JNI: createNativeFilter0() → new CqCompactionFilter │
+│  JNI: setMinPhyOffset0(ptr, offset)                  │
+│                                                      │
+│  NEEDED: librocksdbjni-linux64.so ($ORIGIN RPATH)    │
+└──────────────────┬───────────────────────────────────┘
+                   │
+                   ▼
+┌──────────────────────────────────────────────────────┐
+│  librocksdbjni-linux64.so (official rocksdbjni)      │
+│  - All RocksDB C++ classes (CompactionFilter, etc.)  │
+│  - JNI glue for all Java↔C++ bindings                │
+│  - Compiled with -fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0│
+└──────────────────────────────────────────────────────┘
+```
+
+### Key design decisions
+
+**1. Direct C++ subclassing with explicit linking**
+
+The shim directly subclasses `rocksdb::CompactionFilter` in C++ and is 
compiled with matching ABI flags (`-fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0`) to 
match how `librocksdbjni` was built. It is explicitly linked against 
`librocksdbjni-linux64.so` (extracted from the `rocksdbjni` jar) with `$ORIGIN` 
RPATH so the dynamic linker resolves symbols from the same directory.
+
+This replaced an earlier dlopen/RTLD_GLOBAL approach that caused C++ `double 
free` crashes — loading the same `.so` twice (once via JVM's `RTLD_LOCAL` and 
once via `RTLD_GLOBAL`) creates conflicting C++ global state (memory 
allocators, static singletons, vtables).
+
+**2. Raw pointer as jlong, wrapped with disOwnNativeHandle()**
+
+The native shim creates `new CqCompactionFilter()` and returns the raw C++ 
pointer as a `jlong`. A thin Java wrapper `NativeCqCompactionFilter extends 
AbstractCompactionFilter<Slice>` passes this pointer to the protected 
`AbstractCompactionFilter(long)` constructor, then calls `disOwnNativeHandle()` 
so that `close()` does not free the native memory. This is critical because 
`AbstractRocksDBStorage.shutdown()` closes `ColumnFamilyOptions` (step 2) 
before closing the DB (step 4) — without  [...]
+
+**3. Shared temp directory for .so resolution**
+
+At runtime, `CqCompactionFilterJni` loads `librocksdbjni-linux64.so` from the 
rocksdbjni JAR first (via `System.loadLibrary` or extraction to a temp dir), 
then extracts `libcq_compaction_filter.so` to the same temp directory. This 
ensures the `$ORIGIN` RPATH in the shim correctly resolves its `NEEDED` 
dependency on `librocksdbjni-linux64.so`. The rocksdbjni native library is NOT 
bundled in the RocketMQ repository — it is sourced from the 
`org.rocksdb:rocksdbjni:8.4.4` JAR at runtime.
+
+**4. Thread-safe minPhyOffset with std::atomic**
+
+The `CqCompactionFilter` uses `std::atomic<int64_t>` with 
`memory_order_relaxed` for `min_phy_offset_`. This is sufficient because there 
is a single writer (Java main thread via JNI) and one reader (compaction 
background thread), and eventual consistency is acceptable — a slightly stale 
threshold only means a few extra entries survive one compaction cycle. This 
replaces the earlier `pthread_mutex` approach, eliminating per-entry 
lock/unlock overhead during full compaction over hundreds o [...]
+
+## Changed files
+
+| File | Change |
+|------|--------|
+| `pom.xml` | `rocksdb.version` → `rocksdbjni.version=8.4.4`; dependency 
changed to `org.rocksdb:rocksdbjni` |
+| `common/pom.xml` | `rocketmq-rocksdb` → `org.rocksdb:rocksdbjni` |
+| `common/.../config/AbstractRocksDBStorage.java` | 
`manualCompactionDefaultCfRange` enhanced with `estimateNumKeys` logging 
(before/after key count, elapsed time, reduction ratio); `manualCompaction` 
removed unused `minPhyOffset` parameter |
+| `store/.../rocksdb/ConsumeQueueCompactionFilterFactory.java` | **Deleted** — 
replaced by native shim |
+| `store/.../rocksdb/ConsumeQueueRocksDBStorage.java` | Use 
`CqCompactionFilterJni.createAndSetFilter()` instead of 
`CompactionFilterFactory`; added `triggerCompactionSync()` and `countEntries()` 
helpers |
+| `store/.../rocksdb/RocksDBOptionsFactory.java` | Remove 
`setCompactionFilterFactory()` call from `createCQCFOptions()` |
+| `store/.../rocksdb/CqCompactionFilterJni.java` | **Rewritten** — uses raw 
JNI pointer + `NativeCqCompactionFilter` wrapper via public API; added 
platform-aware library name detection (macOS `.dylib` / Linux `.so` / Windows 
`.dll`) |
+| `store/.../rocksdb/NativeCqCompactionFilter.java` | **New** — thin 
`AbstractCompactionFilter<Slice>` wrapper with `disOwnNativeHandle()` |
+| `store/.../resources/native/cq_compaction_filter.cpp` | **Rewritten** — 
direct C++ subclassing, explicit linking, `std::atomic` for thread safety |
+| `store/.../resources/native/libcq_compaction_filter.so` | **New** — 
pre-compiled native library (Linux x86_64) |
+| `store/.../resources/native/libcq_compaction_filter.dylib` | **New** — 
pre-compiled native library (macOS arm64) |
+| `store/.../resources/native/cq_compaction_filter.dll` | **New** — 
pre-compiled native library (Windows x86_64, MSVC v14.29) |
+| `store/.../rocksdb/CqCompactionFilterJniTest.java` | **New** — integration 
test for compaction filter |
+
+## Building the native shim
+
+Prerequisites: `g++` / `clang++`, RocksDB C++ headers matching `rocksdbjni` 
version (8.4.4), JNI headers from your JDK.
+
+### Linux (x86_64)
+
+```bash
+# 1. Extract librocksdbjni from the rocksdbjni jar
+ROCKSDB_JAR=~/.m2/repository/org/rocksdb/rocksdbjni/8.4.4/rocksdbjni-8.4.4.jar
+unzip -j "$ROCKSDB_JAR" librocksdbjni-linux64.so -d /tmp/rocksdb-native/
+
+# 2. Download matching RocksDB headers
+wget https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz
+tar xzf v8.4.4.tar.gz rocksdb-8.4.4/include --strip-components=1
+
+# 3. Compile the shim with explicit linking
+export JAVA_HOME=/usr/lib/jvm/java-8   # or your JDK path
+g++ -shared -fPIC -O2 -std=c++17 -fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0 \
+    -I./include \
+    -I${JAVA_HOME}/include \
+    -I${JAVA_HOME}/include/linux \
+    -Wl,--no-undefined \
+    -Wl,-rpath,\$ORIGIN \
+    -L/tmp/rocksdb-native \
+    -l:librocksdbjni-linux64.so \
+    -o libcq_compaction_filter.so \
+    store/src/main/resources/native/cq_compaction_filter.cpp
+
+# 4. Verify NEEDED and RPATH
+readelf -d libcq_compaction_filter.so | grep -E "NEEDED|RPATH"
+# Should show: NEEDED librocksdbjni-linux64.so, RPATH $ORIGIN
+
+# 5. Replace the pre-built .so
+cp libcq_compaction_filter.so store/src/main/resources/native/
+```
+
+### macOS (arm64 / x86_64)
+
+On macOS, the rocksdbjni jar uses `.jnilib` extension (not `.so` or bare 
names) and the native library names differ from Linux. Key gotchas:
+- Apple Silicon uses `librocksdbjni-osx-arm64.jnilib` (not 
`librocksdbjni-osx-aarch64` as the filename pattern might suggest)
+- macOS `ld` does NOT support the `-l:` syntax used on Linux — pass the 
`.jnilib` file directly
+- After linking, use `install_name_tool` to fix the absolute install name to 
`@loader_path`, otherwise the shim fails to resolve the rocksdbjni dependency 
at runtime
+- GitHub downloads may be blocked by corporate firewalls; use a mirror (e.g. 
`ghproxy.net`) or a local RocksDB checkout for headers
+
+```bash
+# 1. Extract the macOS native library from rocksdbjni jar
+#    The jar contains librocksdbjni-osx-arm64.jnilib (arm64) or 
librocksdbjni-osx-x86_64.jnilib
+ROCKSDB_JAR=~/.m2/repository/org/rocksdb/rocksdbjni/8.4.4/rocksdbjni-8.4.4.jar
+mkdir -p /tmp/rocksdb-native
+
+# For Apple Silicon (arm64):
+unzip -j "$ROCKSDB_JAR" librocksdbjni-osx-arm64.jnilib -d /tmp/rocksdb-native/
+
+# For Intel Mac (x86_64):
+unzip -j "$ROCKSDB_JAR" librocksdbjni-osx-x86_64.jnilib -d /tmp/rocksdb-native/
+
+# 2. Download matching RocksDB headers
+#    Use ghproxy.net mirror if github.com is blocked:
+curl -sL 
"https://ghproxy.net/https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz";
 -o /tmp/rocksdb-8.4.4.tar.gz
+tar xzf /tmp/rocksdb-8.4.4.tar.gz -C /tmp rocksdb-8.4.4/include 
--strip-components=1
+# Or use a local RocksDB checkout if available:
+# ROCKSDB_INCLUDE=/path/to/rocksdb/include
+
+# 3. Compile the shim — pass .jnilib directly (macOS ld does NOT support -l: 
syntax)
+export JAVA_HOME=$(/usr/libexec/java_home)
+ROCKSDB_INCLUDE=${ROCKSDB_INCLUDE:-./include}  # adjust to your headers 
location
+ROCKSDB_JNILIB=/tmp/rocksdb-native/librocksdbjni-osx-arm64.jnilib  # or 
-x86_64.jnilib
+clang++ -shared -fPIC -O2 -std=c++17 -fno-rtti \
+    -I"$ROCKSDB_INCLUDE" \
+    -I${JAVA_HOME}/include \
+    -I${JAVA_HOME}/include/darwin \
+    -Wl,-undefined,error \
+    "$ROCKSDB_JNILIB" \
+    -o /tmp/rocksdb-native/libcq_compaction_filter.dylib \
+    store/src/main/resources/native/cq_compaction_filter.cpp
+
+# 4. Fix the install_name to use @loader_path for runtime resolution
+#    Without this, otool -L shows an absolute path to the build directory
+install_name_tool -change "$ROCKSDB_JNILIB" "@loader_path/$(basename 
$ROCKSDB_JNILIB)" \
+    /tmp/rocksdb-native/libcq_compaction_filter.dylib
+
+# 5. Verify dependencies
+otool -L /tmp/rocksdb-native/libcq_compaction_filter.dylib
+# Should show @loader_path/librocksdbjni-osx-arm64.jnilib (or -x86_64.jnilib)
+
+# 6. Place the output
+cp /tmp/rocksdb-native/libcq_compaction_filter.dylib 
store/src/main/resources/native/
+```
+
+### Windows (x86_64)
+
+**⚠ Must use MSVC — MinGW is NOT compatible**
+
+The official `librocksdbjni-win64.dll` is compiled with MSVC. MinGW-w64 
produces incompatible C++ binaries (different vtable layout, name mangling, 
exception handling). Attempting to link a MinGW-compiled shim against the 
MSVC-compiled rocksdbjni DLL will cause undefined symbol errors at link time 
and crashes at runtime.
+
+**Option A: Native MSVC build (required for Windows)**
+
+1. Install Visual Studio Build Tools 2019 (v14.29, matching the rocksdbjni 
linker version 14.29.30159).
+2. Use the x64 Native Tools Command Prompt or set up the environment manually.
+
+```powershell
+# 1. Set up environment (run vcvarsall.bat first, or use the VS Dev Command 
Prompt)
+set "VCTools=C:\Program Files\Microsoft Visual 
Studio\2022\BuildTools\VC\Tools\MSVC\14.29.30133"
+set "SDK=C:\Program Files (x86)\Windows Kits\10"
+set "JAVA_HOME=C:\path\to\jdk8"
+
+# 2. Extract RocksDB headers
+curl -LO https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz
+tar xzf v8.4.4.tar.gz rocksdb-8.4.4/include --strip-components=1
+
+# 3. Compile with MSVC cl.exe (must use /GR- to disable RTTI, matching 
rocksdbjni)
+cl.exe /LD /O2 /std:c++17 /GR- /EHsc /utf-8 ^
+   /I"%JAVA_HOME%\include" ^
+   /I"%JAVA_HOME%\include\win32" ^
+   /I"rocksdb-8.4.4\include" ^
+   /I"%VCTools%\include" ^
+   /I"%SDK%\Include\10.0.19041.0\ucrt" ^
+   /I"%SDK%\Include\10.0.19041.0\shared" ^
+   /I"%SDK%\Include\10.0.19041.0\um" ^
+   /Fecq_compaction_filter.dll ^
+   store\src\main\resources\native\cq_compaction_filter.cpp ^
+   /link /MACHINE:X64 ^
+   /LIBPATH:"%VCTools%\lib\x64" ^
+   /LIBPATH:"%SDK%\Lib\10.0.19041.0\ucrt\x64" ^
+   /LIBPATH:"%SDK%\Lib\10.0.19041.0\um\x64"
+
+# 4. Verify exports
+dumpbin /exports cq_compaction_filter.dll
+# Should show: 
Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0
+#             
Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0
+
+# 5. Place the output
+copy cq_compaction_filter.dll store\src\main\resources\native\
+```
+
+> **Note on Git Bash / MSYS2**: When running `cl.exe` from Git Bash, MSYS2's 
automatic path conversion will corrupt `/LD`, `/O2` etc. into `C:/Program/LD` 
etc. Use `MSYS2_ARG_CONV_EXCL='*'` to disable this, or run from `cmd.exe` / 
PowerShell directly.
+
+**Windows build troubleshooting**
+
+| Problem | Cause | Solution |
+|---------|-------|----------|
+| `cl: warning D9024: cannot recognize source file type` | MSYS2/Git Bash 
converts `/LD` to `C:/Program/LD` | Prefix command with 
`MSYS2_ARG_CONV_EXCL='*'` or use `cmd.exe` |
+| `fatal error C1083: cannot open include file 'atomic'` | MSVC C++ headers 
directory not in include path | Add `/I"%VCTools%\include"` (from VS Build 
Tools) |
+| `LNK2019: unresolved external symbol ... Configurable::GetOption` | 
`CompactionFilter` inherits from `Configurable`/`Customizable`, whose virtual 
methods are not exported by `librocksdbjni-win64.dll` | Provide inline stub 
implementations in your `.cpp` for all unexported pure virtual methods |
+| `LNK2019: unresolved external symbol ... 
Status::Status(Code,SubCode,Slice,Slice,Severity)` | 
`Status::NotSupported("msg")` calls the non-inline 5-parameter constructor 
(defined in `status.cc`, not exported by DLL) | Use `return Status();` instead 
of `return Status::NotSupported("...");` — `Status()` default constructor is 
fully inline |
+
+**Option B: Run on WSL (recommended for development)**
+
+Run the entire RocketMQ build and test under WSL (Windows Subsystem for 
Linux). This uses the native Linux toolchain and pre-built `.so` with zero code 
changes:
+
+```bash
+# In WSL (Ubuntu)
+java -version    # should show WSL JDK
+mvn test -pl store -Dtest=CqCompactionFilterJniTest -Djacoco.skip=true
+```
+
+## Platform support
+
+`CqCompactionFilterJni.java` automatically detects the OS and architecture at 
runtime, selecting the correct library name and extension.
+
+| Platform | Library name | Architecture | Status |
+|----------|-------------|--------------|--------|
+| Linux (glibc) | `libcq_compaction_filter.so` | x86_64 | Pre-built |
+| Linux (glibc) | `libcq_compaction_filter_aarch64.so` | aarch64 | Pre-built |
+| macOS | `libcq_compaction_filter.dylib` | arm64 | Pre-built |
+| macOS | `libcq_compaction_filter.dylib` | x86_64 | Requires rebuild |
+| Windows | `cq_compaction_filter.dll` | x86_64 | Pre-built |
+
+## Limitations
+
+1. **Jacoco incompatibility** — The jacoco Java agent can cause native crashes 
when combined with dynamically loaded native libraries. Unit tests should be 
run with `-Djacoco.skip=true` when testing RocksDB functionality.
+
+2. **Global singleton filter** — `CqCompactionFilterJni` stores the native 
filter pointer in a static `AtomicLong NATIVE_FILTER_PTR`. Only one filter 
instance is tracked globally per JVM. If multiple `ConsumeQueueRocksDBStorage` 
instances exist (e.g., in tests or multi-Broker processes), `setMinPhyOffset()` 
always updates the last-created filter. Earlier instances lose their threshold 
updates silently.
+
+3. **C++17 required** — The C++ source uses `std::atomic<int64_t>` which 
requires a C++17-capable compiler. All modern compilers (GCC 7+, Clang 5+, MSVC 
2017+) support this.
+
+4. **Shim depends on rocksdbjni native library at runtime** — The 
`libcq_compaction_filter.so` has a `DT_NEEDED` entry for 
`librocksdbjni-linux64.so` (~13 MB). The `CqCompactionFilterJni` class handles 
this by extracting the shim to the same temp directory as the rocksdbjni native 
library, so the `$ORIGIN` RPATH resolves correctly without requiring 
`LD_LIBRARY_PATH`.
+
+5. **Windows requires MSVC** — `librocksdbjni-win64.dll` is compiled with MSVC 
and does not export C++ base class symbols (`Configurable`/`Customizable` 
vtable methods, `Status` constructors). A MinGW-compiled shim cannot link 
against it. Must use the same MSVC version (v14.29 for rocksdbjni 8.4.4) and 
provide inline stubs for unexported virtual methods.
diff --git a/pom.xml b/pom.xml
index 893e58b496..868faa57d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@
         
<opentelemetry-exporter-prometheus.version>1.47.0-alpha</opentelemetry-exporter-prometheus.version>
         <jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
         <s3.version>2.20.29</s3.version>
-        <rocksdb.version>1.0.6</rocksdb.version>
+        <rocksdbjni.version>8.4.4</rocksdbjni.version>
         <jackson-databind.version>2.13.4.2</jackson-databind.version>
         <sofa-jraft.version>1.3.14</sofa-jraft.version>
 
@@ -761,9 +761,9 @@
                 <version>${slf4j-api.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.apache.rocketmq</groupId>
-                <artifactId>rocketmq-rocksdb</artifactId>
-                <version>${rocksdb.version}</version>
+                <groupId>org.rocksdb</groupId>
+                <artifactId>rocksdbjni</artifactId>
+                <version>${rocksdbjni.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.github.aliyunmq</groupId>
diff --git a/store/pom.xml b/store/pom.xml
index 1600a007e0..70ecafe428 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -71,5 +71,9 @@
             <groupId>io.github.aliyunmq</groupId>
             <artifactId>rocketmq-shaded-slf4j-api-bridge</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
deleted file mode 100644
index f19fb9e203..0000000000
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.rocksdb;
-
-import java.util.function.LongSupplier;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.rocksdb.AbstractCompactionFilter;
-import org.rocksdb.AbstractCompactionFilterFactory;
-import org.rocksdb.RemoveConsumeQueueCompactionFilter;
-
-public class ConsumeQueueCompactionFilterFactory extends 
AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
-    private final LongSupplier minPhyOffsetSupplier;
-
-    public ConsumeQueueCompactionFilterFactory(final LongSupplier 
minPhyOffsetSupplier) {
-        this.minPhyOffsetSupplier = minPhyOffsetSupplier;
-    }
-
-    @Override
-    public String name() {
-        return "ConsumeQueueCompactionFilterFactory";
-    }
-
-    @Override
-    public RemoveConsumeQueueCompactionFilter createCompactionFilter(final 
AbstractCompactionFilter.Context context) {
-        long minPhyOffset = this.minPhyOffsetSupplier.getAsLong();
-        LOGGER.info("manualCompaction minPhyOffset: {}, isFull: {}, isManual: 
{}",
-                minPhyOffset, context.isFullCompaction(), 
context.isManualCompaction());
-        return new RemoveConsumeQueueCompactionFilter(minPhyOffset);
-    }
-}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index 4392283c67..925d8f91c7 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -21,10 +21,14 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.MessageStore;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.FlushOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -33,13 +37,13 @@ import org.rocksdb.WriteBatch;
 
 public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage {
 
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+
     public static final byte[] OFFSET_COLUMN_FAMILY = 
"offset".getBytes(StandardCharsets.UTF_8);
 
     private final MessageStore messageStore;
     private volatile ColumnFamilyHandle offsetCFHandle;
 
-    private ConsumeQueueCompactionFilterFactory compactionFilterFactory;
-
     public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final 
String dbPath) {
         super(dbPath);
         this.messageStore = messageStore;
@@ -67,20 +71,27 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
 
             final List<ColumnFamilyDescriptor> cfDescriptors = new 
ArrayList<>();
 
-            this.compactionFilterFactory = new 
ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset);
-
-            ColumnFamilyOptions cqCfOptions = 
RocksDBOptionsFactory.createCQCFOptions(this.messageStore, 
this.compactionFilterFactory);
+            ColumnFamilyOptions cqCfOptions = 
RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
             this.cfOptions.add(cqCfOptions);
             cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions));
 
             ColumnFamilyOptions offsetCfOptions = 
RocksDBOptionsFactory.createOffsetCFOptions();
             this.cfOptions.add(offsetCfOptions);
             cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY, 
offsetCfOptions));
+
+            if (CqCompactionFilterJni.isLoaded()) {
+                CqCompactionFilterJni.createAndSetFilter(cqCfOptions);
+                
CqCompactionFilterJni.setMinPhyOffset(messageStore.getMinPhyOffset());
+                log.info("CqCompactionFilter created and set, minPhyOffset: 
{}", messageStore.getMinPhyOffset());
+            } else {
+                log.warn("CqCompactionFilterJni native library not loaded, 
compaction filter will not be installed");
+            }
+
             open(cfDescriptors);
             this.defaultCFHandle = cfHandles.get(0);
             this.offsetCFHandle = cfHandles.get(1);
         } catch (final Exception e) {
-            LOGGER.error("postLoad Failed. {}", this.dbPath, e);
+            log.error("postLoad Failed. {}", this.dbPath, e);
             return false;
         }
         return true;
@@ -91,11 +102,6 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
         if (this.offsetCFHandle != null) {
             this.offsetCFHandle.close();
         }
-
-        if (this.compactionFilterFactory != null) {
-            this.compactionFilterFactory.close();
-        }
-
     }
 
     public byte[] getCQ(final byte[] keyBytes) throws RocksDBException {
@@ -116,10 +122,13 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
     }
 
     public void manualCompaction(final long minPhyOffset) {
+        if (CqCompactionFilterJni.isLoaded()) {
+            CqCompactionFilterJni.setMinPhyOffset(minPhyOffset);
+        }
         try {
-            manualCompaction(minPhyOffset, this.compactRangeOptions);
+            super.manualCompaction(this.compactRangeOptions);
         } catch (Exception e) {
-            LOGGER.error("manualCompaction Failed. minPhyOffset: {}", 
minPhyOffset, e);
+            log.error("manualCompaction Failed. minPhyOffset: {}", 
minPhyOffset, e);
         }
     }
 
@@ -130,4 +139,41 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
     public ColumnFamilyHandle getOffsetCFHandle() {
         return this.offsetCFHandle;
     }
+
+    /**
+     * Synchronously trigger compaction with an updated compaction filter 
threshold.
+     * This method updates the native compaction filter's minPhyOffset and then
+     * performs a full compaction on the default column family.
+     */
+    public void triggerCompactionSync(long minPhyOffset) throws 
RocksDBException {
+        if (CqCompactionFilterJni.isLoaded()) {
+            CqCompactionFilterJni.setMinPhyOffset(minPhyOffset);
+        }
+        db.compactRange(this.defaultCFHandle);
+    }
+
+    /**
+     * Flush all memtables to SST files.
+     */
+    public void flushAll() throws RocksDBException {
+        try (FlushOptions flushOpts = new FlushOptions()) {
+            flushOpts.setWaitForFlush(true);
+            flush(flushOpts);
+        }
+    }
+
+    /**
+     * Count all entries in the default column family by iterating. O(N), use 
only in tests.
+     */
+    public long countEntries() {
+        long count = 0;
+        try (RocksIterator iter = db.newIterator(this.defaultCFHandle)) {
+            iter.seekToFirst();
+            while (iter.isValid()) {
+                count++;
+                iter.next();
+            }
+        }
+        return count;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
new file mode 100644
index 0000000000..69d74e3364
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.rocksdb.ColumnFamilyOptions;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class CqCompactionFilterJni {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+
+    private static final AtomicLong NATIVE_FILTER_PTR = new AtomicLong(0);
+    private static volatile boolean loaded = false;
+
+    /** Platform-specific shim library name and extension. */
+    private static final String SHIM_LIB_NAME;
+    private static final String SHIM_LIB_EXTENSION;
+    private static final String ROCKSDB_JNI_LIB_NAME;
+
+    static {
+        String os = System.getProperty("os.name").toLowerCase();
+        String arch = System.getProperty("os.arch");
+        if (os.contains("mac") || os.contains("darwin") || os.contains("osx")) 
{
+            SHIM_LIB_NAME = "libcq_compaction_filter.dylib";
+            SHIM_LIB_EXTENSION = ".dylib";
+            ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") || 
arch.contains("arm")
+                ? "librocksdbjni-osx-aarch64"
+                : "librocksdbjni-osx-x86_64";
+        } else if (os.contains("win")) {
+            SHIM_LIB_NAME = "cq_compaction_filter.dll";
+            SHIM_LIB_EXTENSION = ".dll";
+            ROCKSDB_JNI_LIB_NAME = "librocksdbjni-win64.dll";
+        } else {
+            SHIM_LIB_NAME = arch.contains("aarch") || arch.contains("arm")
+                ? "libcq_compaction_filter_aarch64.so"
+                : "libcq_compaction_filter.so";
+            SHIM_LIB_EXTENSION = ".so";
+            ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") || 
arch.contains("arm")
+                ? "librocksdbjni-linux-aarch64.so"
+                : "librocksdbjni-linux64.so";
+        }
+    }
+
+    static {
+        loadNativeShim();
+    }
+
+    private static synchronized void loadNativeShim() {
+        if (loaded) {
+            return;
+        }
+
+        // Preload RocksDB's native library so that linked symbols are 
available
+        // when our compaction filter shim is loaded.
+        String rocksdbDir = ensureRocksDBNativeLoaded();
+
+        String libName = SHIM_LIB_NAME;
+        try (InputStream is = CqCompactionFilterJni.class
+            .getClassLoader().getResourceAsStream("native/" + libName)) {
+            if (is == null) {
+                log.error("[CqCompactionFilterJni] Native library '{}' not 
found on classpath", libName);
+                return;
+            }
+            File tempLib;
+            if (rocksdbDir != null) {
+                // Extract our shim to the same temp directory as the RocksDB 
JNI library,
+                // so that the DT_NEEDED / LC_LOAD_DYLIB dependency can be 
resolved.
+                tempLib = new File(rocksdbDir, libName);
+            } else {
+                // RocksDB was loaded from java.library.path; our shim can go 
anywhere.
+                tempLib = File.createTempFile("cq_compaction_filter_", 
SHIM_LIB_EXTENSION);
+            }
+            Files.copy(is, tempLib.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+            tempLib.deleteOnExit();
+            System.load(tempLib.getAbsolutePath());
+            loaded = true;
+            log.info("[CqCompactionFilterJni] Native library loaded from 
classpath: {}", tempLib.getAbsolutePath());
+        } catch (IOException e) {
+            log.error("[CqCompactionFilterJni] Failed to load native shim", e);
+        }
+    }
+
+    /**
+     * Returns whether the native compaction filter shim was successfully 
loaded.
+     */
+    public static boolean isLoaded() {
+        return loaded;
+    }
+
+    /**
+     * Locates and loads the RocksDB native JNI library, returning the 
temporary
+     * directory in which it was extracted (or null if loaded from 
java.library.path).
+     * <p>
+     * This method deliberately uses {@code System.loadLibrary("rocksdbjni")}
+     * rather than {@code RocksDB.loadLibrary()} for the following reasons:
+     * <ol>
+     * <li><b>Avoid unnecessary side effects</b> — {@code 
RocksDB.loadLibrary()}
+     *     iterates over all compression types (snappy, lz4, zstd, bzip2, etc.)
+     *     and attempts to load each one. Those libraries are not needed by 
this
+     *     compaction filter, and the resulting {@code UnsatisfiedLinkError}s 
slow
+     *     down startup and pollute logs.</li>
+     * <li><b>Control the temp directory location</b> — The caller needs to 
know
+     *     the directory where the native JNI library was extracted so that
+     *     {@code libcq_compaction_filter.so} can be placed alongside it. This 
is
+     *     required for the dynamic linker to resolve the {@code DT_NEEDED}
+     *     dependency of the custom shim. {@code RocksDB.loadLibrary()} 
extracts
+     *     to an internal temp directory that is not exposed to callers.</li>
+     * <li><b>Avoid class-loading coupling</b> — {@code RocksDB.loadLibrary()}
+     *     triggers the full initialization chain of the rocksdbjni Java 
bindings
+     *     (including {@code CompressionType.values()} iteration and a 
singleton
+     *     {@code NativeLibraryLoader} state machine). Loading the custom shim
+     *     must complete before any RocksDB Java classes are exercised, to 
avoid
+     *     native symbol resolution race conditions.</li>
+     * </ol>
+     *
+     * @return the absolute path of the temporary directory containing the
+     *         extracted RocksDB JNI library, or null if the library was loaded
+     *         from {@code java.library.path} (in which case no temp directory
+     *         is needed for the shim).
+     */
+    private static String ensureRocksDBNativeLoaded() {
+        // Try System.loadLibrary first (works if on java.library.path)
+        try {
+            System.loadLibrary("rocksdbjni");
+            // No temp dir needed since it's on java.library.path
+            return null;
+        } catch (UnsatisfiedLinkError ignored) {
+            // Not on java.library.path, try from JAR
+        }
+
+        // Determine the platform-specific JNI library name from RocksDB's 
Environment
+        String jniLibName;
+        try {
+            jniLibName = 
org.rocksdb.util.Environment.getJniLibraryFileName("rocksdb");
+        } catch (Exception e) {
+            jniLibName = ROCKSDB_JNI_LIB_NAME;
+        }
+
+        try (InputStream is = 
CqCompactionFilterJni.class.getClassLoader().getResourceAsStream(jniLibName)) {
+            if (is == null) {
+                log.error("[CqCompactionFilterJni] RocksDB native library '{}' 
not found on classpath", jniLibName);
+                return null;
+            }
+            // Create a temp directory and extract the library there.
+            // Our shim will be placed in the same directory so the DT_NEEDED
+            // dependency resolves correctly.
+            File tempDir = 
Files.createTempDirectory("rocksdb-native").toFile();
+            tempDir.deleteOnExit();
+            File tempLib = new File(tempDir, jniLibName);
+            Files.copy(is, tempLib.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+            tempLib.deleteOnExit();
+            System.load(tempLib.getAbsolutePath());
+            return tempDir.getAbsolutePath();
+        } catch (IOException e) {
+            log.error("[CqCompactionFilterJni] Failed to extract RocksDB 
native library", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create a native CqCompactionFilter instance.
+     * Returns the raw C++ pointer as a jlong.
+     */
+    public static native long createNativeFilter0();
+
+    /**
+     * Update the minPhyOffset threshold on an existing native filter.
+     */
+    public static native void setMinPhyOffset0(long filterPtr, long 
minPhyOffset);
+
+    /**
+     * Set the native compaction filter on the ColumnFamilyOptions via the
+     * public {@code setCompactionFilter} API.
+     * <p>
+     * The wrapper uses {@code disOwnNativeHandle()} so that closing the
+     * ColumnFamilyOptions does not free the native filter — this prevents
+     * use-after-free when AbstractRocksDBStorage closes options before the DB.
+     */
+    public static void setNativeFilter(ColumnFamilyOptions options, long 
filterPtr) {
+        NativeCqCompactionFilter filter = new 
NativeCqCompactionFilter(filterPtr);
+        options.setCompactionFilter(filter);
+    }
+
+    /**
+     * Create the native filter and set it on the ColumnFamilyOptions.
+     * Returns the native pointer for later threshold updates.
+     */
+    @SuppressWarnings("UnusedReturnValue")
+    public static long createAndSetFilter(ColumnFamilyOptions options) {
+        long ptr = createNativeFilter0();
+        NATIVE_FILTER_PTR.set(ptr);
+        setNativeFilter(options, ptr);
+        return ptr;
+    }
+
+    /**
+     * Update the minPhyOffset on the current native filter.
+     */
+    public static void setMinPhyOffset(long minPhyOffset) {
+        long ptr = NATIVE_FILTER_PTR.get();
+        if (ptr != 0) {
+            setMinPhyOffset0(ptr, minPhyOffset);
+            log.info("CqCompactionFilter setMinPhyOffset={}", minPhyOffset);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
new file mode 100644
index 0000000000..6a3101c261
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.Slice;
+
+/**
+ * Thin Java wrapper around a native CqCompactionFilter C++ pointer.
+ * <p>
+ * The native filter is allocated by {@link 
CqCompactionFilterJni#createNativeFilter0()}
+ * and its lifetime is managed externally (it lives for the entire JVM 
session).
+ * {@link #disOwnNativeHandle()} is called so that {@code close()} does not
+ * free the native memory — this is critical because {@code 
AbstractRocksDBStorage}
+ * closes {@code ColumnFamilyOptions} (which closes this filter) before closing
+ * the DB, while background compaction threads may still reference the filter.
+ */
+class NativeCqCompactionFilter extends AbstractCompactionFilter<Slice> {
+
+    NativeCqCompactionFilter(long nativeHandle) {
+        super(nativeHandle);
+        disOwnNativeHandle();
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index b74cf8c85d..37eec67d35 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -41,8 +41,7 @@ import org.rocksdb.util.SizeUnit;
 
 public class RocksDBOptionsFactory {
 
-    public static ColumnFamilyOptions createCQCFOptions(final MessageStore 
messageStore,
-        ConsumeQueueCompactionFilterFactory 
consumeQueueCompactionFilterFactory) {
+    public static ColumnFamilyOptions createCQCFOptions(final MessageStore 
messageStore) {
         BlockBasedTableConfig blockBasedTableConfig = new 
BlockBasedTableConfig().
                 setFormatVersion(5).
                 setIndexType(IndexType.kBinarySearch).
@@ -93,7 +92,6 @@ public class RocksDBOptionsFactory {
                 setTargetFileSizeBase(256 * SizeUnit.MB).
                 setTargetFileSizeMultiplier(2).
                 setMergeOperator(new StringAppendOperator()).
-                
setCompactionFilterFactory(consumeQueueCompactionFilterFactory).
                 setReportBgIoStats(true).
                 setOptimizeFiltersForHits(true);
     }
diff --git a/store/src/main/resources/native/cq_compaction_filter.cpp 
b/store/src/main/resources/native/cq_compaction_filter.cpp
new file mode 100644
index 0000000000..1d0bd84cd9
--- /dev/null
+++ b/store/src/main/resources/native/cq_compaction_filter.cpp
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Native compaction filter for ConsumeQueue entries.
+ *
+ * Subclass rocksdb::CompactionFilter directly, create instances in C++,
+ * and pass the raw C++ pointer as a jlong to Java. Java's
+ * AbstractCompactionFilter(nativeHandle) wraps it seamlessly.
+ *
+ * All rocksdb symbols are declared weak so they resolve at runtime to the
+ * symbols already loaded by the JVM's ClassLoader.
+ */
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/slice.h"
+
+/* ------------------------------------------------------------------ */
+/* Windows stub implementations                                       */
+/*                                                                    */
+/* On Linux/macOS, ELF/Mach-O shared libraries export all symbols by  */
+/* default, so the shim resolves inherited virtual methods from       */
+/* librocksdbjni at link time. On Windows, DLLs only export symbols   */
+/* marked __declspec(dllexport) — rocksdbjni only exports JNI entry   */
+/* points, not internal C++ class methods. We must provide stub       */
+/* implementations for the Configurable/Customizable virtual methods  */
+/* that appear in CompactionFilter's vtable. These stubs are never    */
+/* called at runtime (RocksDB only invokes Filter() and Name() on     */
+/* compaction filters), but the linker needs addresses for them.      */
+/* ------------------------------------------------------------------ */
+
+#ifdef _WIN32
+
+#include "rocksdb/configurable.h"
+#include "rocksdb/customizable.h"
+#include <unordered_map>
+#include <unordered_set>
+
+namespace rocksdb {
+
+struct ConfigOptions;
+struct DBOptions;
+struct ColumnFamilyOptions;
+class OptionTypeInfo;
+
+// --- Configurable virtual methods (defined in options/configurable.cc) ---
+
+Status Configurable::GetOption(const ConfigOptions&, const std::string&,
+                               std::string*) const {
+    return Status();
+}
+
+bool Configurable::AreEquivalent(const ConfigOptions&, const Configurable*,
+                                 std::string*) const {
+    return true;
+}
+
+Status Configurable::PrepareOptions(const ConfigOptions&) {
+    return Status();
+}
+
+Status Configurable::ValidateOptions(const DBOptions&,
+                                     const ColumnFamilyOptions&) const {
+    return Status();
+}
+
+const void* Configurable::GetOptionsPtr(const std::string&) const {
+    return nullptr;
+}
+
+Status Configurable::ParseStringOptions(const ConfigOptions&,
+                                        const std::string&) {
+    return Status();
+}
+
+Status Configurable::ConfigureOptions(
+    const ConfigOptions&,
+    const std::unordered_map<std::string, std::string>&,
+    std::unordered_map<std::string, std::string>*) {
+    return Status();
+}
+
+Status Configurable::ParseOption(const ConfigOptions&, const OptionTypeInfo&,
+                                 const std::string&, const std::string&,
+                                 void*) {
+    return Status();
+}
+
+bool Configurable::OptionsAreEqual(const ConfigOptions&, const OptionTypeInfo&,
+                                   const std::string&, const void*,
+                                   const void*, std::string*) const {
+    return true;
+}
+
+std::string Configurable::SerializeOptions(const ConfigOptions&,
+                                           const std::string&) const {
+    return "";
+}
+
+std::string Configurable::GetOptionName(const std::string& name) const {
+    return name;
+}
+
+// Non-virtual, but referenced by inline code paths
+void Configurable::RegisterOptions(const std::string&, void*,
+    const std::unordered_map<std::string, OptionTypeInfo>*) {}
+
+Status Configurable::ConfigureFromMap(
+    const ConfigOptions&,
+    const std::unordered_map<std::string, std::string>&) {
+    return Status();
+}
+
+Status Configurable::ConfigureFromMap(
+    const ConfigOptions&,
+    const std::unordered_map<std::string, std::string>&,
+    std::unordered_map<std::string, std::string>*) {
+    return Status();
+}
+
+Status Configurable::ConfigureOption(const ConfigOptions&, const std::string&,
+                                     const std::string&) {
+    return Status();
+}
+
+Status Configurable::ConfigureFromString(const ConfigOptions&,
+                                         const std::string&) {
+    return Status();
+}
+
+Status Configurable::GetOptionString(const ConfigOptions&,
+                                     std::string*) const {
+    return Status();
+}
+
+std::string Configurable::ToString(const ConfigOptions&,
+                                   const std::string&) const {
+    return "";
+}
+
+Status Configurable::GetOptionNames(const ConfigOptions&,
+                                    std::unordered_set<std::string>*) const {
+    return Status();
+}
+
+Status Configurable::GetOptionsMap(
+    const std::string&, const std::string&, std::string*,
+    std::unordered_map<std::string, std::string>*) {
+    return Status();
+}
+
+// --- Customizable virtual/override methods (defined in 
options/customizable.cc) ---
+
+Status Customizable::GetOption(const ConfigOptions&, const std::string&,
+                               std::string*) const {
+    return Status();
+}
+
+bool Customizable::AreEquivalent(const ConfigOptions&, const Configurable*,
+                                 std::string*) const {
+    return true;
+}
+
+std::string Customizable::GetOptionName(const std::string& name) const {
+    return name;
+}
+
+std::string Customizable::SerializeOptions(const ConfigOptions&,
+                                           const std::string&) const {
+    return "";
+}
+
+std::string Customizable::GenerateIndividualId() const {
+    return "stub";
+}
+
+Status Customizable::GetOptionsMap(
+    const ConfigOptions&, const Customizable*, const std::string&,
+    std::string*, std::unordered_map<std::string, std::string>*) {
+    return Status();
+}
+
+Status Customizable::ConfigureNewObject(
+    const ConfigOptions&, Customizable*,
+    const std::unordered_map<std::string, std::string>&) {
+    return Status();
+}
+
+// --- Status methods (defined in util/status.cc) ---
+
+Status::Status(Code _code, SubCode _subcode, const Slice& msg,
+               const Slice& msg2, Severity sev)
+    : code_(_code), subcode_(_subcode), sev_(sev),
+      retryable_(false), data_loss_(false), scope_(0) {}
+
+std::unique_ptr<const char[]> Status::CopyState(const char* s) {
+    if (s == nullptr) return nullptr;
+    const size_t n = std::strlen(s) + 1;
+    char* result = new char[n];
+    std::memcpy(result, s, n);
+    return std::unique_ptr<const char[]>(result);
+}
+
+std::string Status::ToString() const {
+    return "OK";
+}
+
+}  // namespace rocksdb
+
+#endif  // _WIN32
+
+/* ------------------------------------------------------------------ */
+/* Our concrete compaction filter                                     */
+/* ------------------------------------------------------------------ */
+
+class CqCompactionFilter : public rocksdb::CompactionFilter {
+public:
+    const char* Name() const override {
+        return "ConsumeQueueCompactionFilter";
+    }
+
+    bool Filter(int /*level*/, const rocksdb::Slice& /*key*/,
+                const rocksdb::Slice& existing_value, std::string* 
/*new_value*/,
+                bool* /*value_changed*/) const override {
+        static const int CQ_MIN_SIZE = 28;
+        if (existing_value.size() < static_cast<size_t>(CQ_MIN_SIZE)) {
+            return false;
+        }
+        const unsigned char* data =
+            reinterpret_cast<const unsigned char*>(existing_value.data());
+        int64_t phy_offset =
+            (static_cast<int64_t>(data[0]) << 56) |
+            (static_cast<int64_t>(data[1]) << 48) |
+            (static_cast<int64_t>(data[2]) << 40) |
+            (static_cast<int64_t>(data[3]) << 32) |
+            (static_cast<int64_t>(data[4]) << 24) |
+            (static_cast<int64_t>(data[5]) << 16) |
+            (static_cast<int64_t>(data[6]) << 8) |
+            (static_cast<int64_t>(data[7]));
+
+        int64_t min_offset = min_phy_offset_.load(std::memory_order_relaxed);
+        return phy_offset < min_offset;
+    }
+
+    void SetMinPhyOffset(int64_t offset) {
+        min_phy_offset_.store(offset, std::memory_order_relaxed);
+    }
+
+private:
+    std::atomic<int64_t> min_phy_offset_{0};
+};
+
+/* ------------------------------------------------------------------ */
+/* JNI bindings                                                       */
+/* ------------------------------------------------------------------ */
+
+#include <jni.h>
+
+extern "C" {
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0(
+    JNIEnv* env, jclass clazz) {
+    CqCompactionFilter* filter = new CqCompactionFilter();
+    return reinterpret_cast<jlong>(filter);
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0(
+    JNIEnv* env, jclass clazz, jlong filterPtr, jlong minPhyOffset) {
+    CqCompactionFilter* filter = 
reinterpret_cast<CqCompactionFilter*>(filterPtr);
+    filter->SetMinPhyOffset(minPhyOffset);
+}
+
+} // extern "C"
diff --git a/store/src/main/resources/native/cq_compaction_filter.dll 
b/store/src/main/resources/native/cq_compaction_filter.dll
new file mode 100755
index 0000000000..2dc74834f4
Binary files /dev/null and 
b/store/src/main/resources/native/cq_compaction_filter.dll differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter.dylib 
b/store/src/main/resources/native/libcq_compaction_filter.dylib
new file mode 100755
index 0000000000..58d6e6796a
Binary files /dev/null and 
b/store/src/main/resources/native/libcq_compaction_filter.dylib differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter.so 
b/store/src/main/resources/native/libcq_compaction_filter.so
new file mode 100755
index 0000000000..46dabe1880
Binary files /dev/null and 
b/store/src/main/resources/native/libcq_compaction_filter.so differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter_aarch64.so 
b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so
new file mode 100755
index 0000000000..b77869f063
Binary files /dev/null and 
b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so differ
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
new file mode 100644
index 0000000000..eead66b074
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.WriteBatch;
+
+public class CqCompactionFilterJniTest {
+
+    private static final int TOPIC_COUNT = 100;
+    private static final int BATCH_SIZE = 100_000;
+    private static final int MSG_SIZE = 1000;
+
+    private static final byte CTRL_1 = '\u0001';
+    private ConsumeQueueRocksDBStorage storage;
+
+    @Before
+    public void setUp() throws Exception {
+        Assume.assumeTrue("CqCompactionFilterJni native library must be 
loaded", CqCompactionFilterJni.isLoaded());
+        String dbPath = Files.createTempDirectory("rocksdb-cq-compaction-" + 
UUID.randomUUID()).toString();
+        MessageStore mockStore = Mockito.mock(MessageStore.class);
+        Mockito.when(mockStore.getMinPhyOffset()).thenReturn(0L);
+        Mockito.when(mockStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
+        storage = new ConsumeQueueRocksDBStorage(mockStore, dbPath);
+    }
+
+    @After
+    public void tearDown() {
+        if (storage != null) {
+            storage.shutdown();
+            storage.destroy();
+        }
+    }
+
+    @Test
+    public void testCreateAndSetFilter() {
+        Assert.assertTrue("Native library should be loaded", 
CqCompactionFilterJni.isLoaded());
+
+        long ptr = CqCompactionFilterJni.createNativeFilter0();
+        Assert.assertTrue("Native filter pointer should be non-zero", ptr != 
0);
+
+        CqCompactionFilterJni.setMinPhyOffset0(ptr, 1000);
+        CqCompactionFilterJni.setMinPhyOffset0(ptr, Long.MAX_VALUE);
+
+        try (ColumnFamilyOptions options = new ColumnFamilyOptions()) {
+            CqCompactionFilterJni.setNativeFilter(options, ptr);
+        }
+    }
+
+    @Test
+    public void testCompactionFilter_small() throws Exception {
+        runCompactionTest(1_000_000);
+    }
+
+    @Test
+    public void testCompactionFilter_large() throws Exception {
+        runCompactionTest(10_000_000);
+    }
+
+    private void runCompactionTest(int totalEntries) throws Exception {
+        long start = System.currentTimeMillis();
+        boolean result = storage.start();
+        if (!result) {
+            System.err.println("storage.start() returned false. Check ERROR 
logs above for details.");
+        }
+        Assert.assertTrue("ConsumeQueueRocksDBStorage failed to start", 
result);
+        log("Startup took %d ms", System.currentTimeMillis() - start);
+
+        // Phase 1: Write entries
+        start = System.currentTimeMillis();
+        writeEntries(totalEntries);
+        long writeTime = System.currentTimeMillis() - start;
+        log("Wrote %d entries in %d ms (%.0f entries/sec)", totalEntries, 
writeTime, totalEntries * 1000.0 / writeTime);
+
+        // Phase 2: Count entries before compaction
+        start = System.currentTimeMillis();
+        long countBefore = storage.countEntries();
+        long countTime = System.currentTimeMillis() - start;
+        log("Count before compaction: %d (took %d ms)", countBefore, 
countTime);
+        Assert.assertEquals("Entry count should match total written", 
totalEntries, countBefore);
+
+        // Flush memtables to SST files so compaction has something to process
+        start = System.currentTimeMillis();
+        storage.flushAll();
+        log("Flush took %d ms", System.currentTimeMillis() - start);
+
+        // Phase 3: Set minPhyOffset at midpoint and trigger compaction
+        long minPhyOffset = (long) (totalEntries / 2.0) * MSG_SIZE;
+        start = System.currentTimeMillis();
+        storage.triggerCompactionSync(minPhyOffset);
+        long compactTime = System.currentTimeMillis() - start;
+        log("Compaction with minPhyOffset=%d took %d ms", minPhyOffset, 
compactTime);
+
+        // Phase 4: Count entries after compaction
+        start = System.currentTimeMillis();
+        long countAfter = storage.countEntries();
+        countTime = System.currentTimeMillis() - start;
+        log("Count after compaction: %d (took %d ms)", countAfter, countTime);
+
+        // Verify: approximately half the entries should remain
+        long expectedSurvivors = totalEntries - totalEntries / 2;
+        long tolerance = Math.max(expectedSurvivors / 100, 100);
+        Assert.assertTrue(
+            "Expected ~" + expectedSurvivors + " entries after compaction, but 
got " + countAfter,
+            countAfter >= expectedSurvivors - tolerance && countAfter <= 
expectedSurvivors + tolerance
+        );
+
+        log("Test passed: %d -> %d entries (expected ~%d)", totalEntries, 
countAfter, expectedSurvivors);
+    }
+
+    private void writeEntries(int totalEntries) throws Exception {
+        int entriesPerTopic = totalEntries / TOPIC_COUNT;
+
+        for (int t = 0; t < TOPIC_COUNT; t++) {
+            String topic = "test-topic-" + t;
+            byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+            int queueId = 0;
+
+            try (WriteBatch batch = new WriteBatch()) {
+                for (int i = 0; i < entriesPerTopic; i++) {
+                    int globalIndex = t * entriesPerTopic + i;
+
+                    // Key: 
[topic_len:4][CTRL_1][topic][CTRL_1][queue_id:4][CTRL_1][cq_offset:8]
+                    int keyLen = Integer.BYTES + 1 + topicBytes.length + 1 + 
Integer.BYTES + 1 + Long.BYTES;
+                    ByteBuffer keyBB = ByteBuffer.allocate(keyLen);
+                    keyBB.putInt(topicBytes.length)
+                        .put(CTRL_1)
+                        .put(topicBytes)
+                        .put(CTRL_1)
+                        .putInt(queueId)
+                        .put(CTRL_1)
+                        .putLong(i);
+
+                    // Value: 
[phy_offset:8][msg_size:4][tags_code:8][store_timestamp:8] (28 bytes)
+                    long phyOffset = (long) globalIndex * MSG_SIZE;
+                    ByteBuffer valueBB = ByteBuffer.allocate(28);
+                    valueBB.putLong(phyOffset)
+                        .putInt(MSG_SIZE)
+                        .putLong(0)
+                        .putLong(System.currentTimeMillis());
+
+                    batch.put(storage.getDefaultCFHandle(), keyBB.array(), 
valueBB.array());
+
+                    if ((i + 1) % BATCH_SIZE == 0) {
+                        storage.batchPut(batch);
+                    }
+                }
+                if (entriesPerTopic % BATCH_SIZE != 0) {
+                    storage.batchPut(batch);
+                }
+            }
+        }
+    }
+
+    private void log(String format, Object... args) {
+        System.out.printf("[CqCompactionFilterJniTest] " + format + "%n", 
args);
+    }
+}

Reply via email to