This is an automated email from the ASF dual-hosted git repository.
lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 61c493c068 [ISSUE #10334] Support native CqCompactionFilter with
cross-platform JNI shim (#10335)
61c493c068 is described below
commit 61c493c068db333c268c554f5515f2ffc975a0b1
Author: lizhimins <[email protected]>
AuthorDate: Thu May 21 10:12:14 2026 +0800
[ISSUE #10334] Support native CqCompactionFilter with cross-platform JNI
shim (#10335)
---
.github/workflows/maven.yaml | 8 +-
common/pom.xml | 4 +-
.../common/config/AbstractRocksDBStorage.java | 23 +-
docs/en/Native_RocksDB.md | 290 ++++++++++++++++++++
pom.xml | 8 +-
store/pom.xml | 4 +
.../ConsumeQueueCompactionFilterFactory.java | 47 ----
.../store/rocksdb/ConsumeQueueRocksDBStorage.java | 72 ++++-
.../store/rocksdb/CqCompactionFilterJni.java | 228 ++++++++++++++++
.../store/rocksdb/NativeCqCompactionFilter.java | 38 +++
.../store/rocksdb/RocksDBOptionsFactory.java | 4 +-
.../main/resources/native/cq_compaction_filter.cpp | 294 +++++++++++++++++++++
.../main/resources/native/cq_compaction_filter.dll | Bin 0 -> 136192 bytes
.../resources/native/libcq_compaction_filter.dylib | Bin 0 -> 41776 bytes
.../resources/native/libcq_compaction_filter.so | Bin 0 -> 23848 bytes
.../native/libcq_compaction_filter_aarch64.so | Bin 0 -> 74824 bytes
.../store/rocksdb/CqCompactionFilterJniTest.java | 184 +++++++++++++
17 files changed, 1129 insertions(+), 75 deletions(-)
diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 4eacd65b84..aaf165420c 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -27,7 +27,13 @@ jobs:
distribution: "corretto"
cache: "maven"
- name: Build with Maven
- run: mvn -B package --file pom.xml
+ shell: bash
+ run: |
+ JACOCO_FLAG=""
+ if [ "${{ matrix.os }}" != "ubuntu-latest" ]; then
+ JACOCO_FLAG="-Djacoco.skip=true"
+ fi
+ mvn -B package --file pom.xml $JACOCO_FLAG
- name: Upload Auth JVM crash logs
if: failure()
diff --git a/common/pom.xml b/common/pom.xml
index eec7118414..b931afcb89 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -113,8 +113,8 @@
<artifactId>rocketmq-logback-classic</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-rocksdb</artifactId>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
</dependency>
</dependencies>
</project>
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index bc4a18006f..4875ce43e2 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -427,22 +427,35 @@ public abstract class AbstractRocksDBStorage {
if (!hold()) {
return;
}
- long s1 = System.currentTimeMillis();
+ long before = getEstimateNumKeys();
+ long startMs = System.currentTimeMillis();
boolean result = true;
try {
- LOGGER.info("manualCompaction Start. {}", this.dbPath);
+ LOGGER.info("ManualCompaction started, dbPath={},
estimateNumKeys={}", this.dbPath, before);
this.db.compactRange(this.defaultCFHandle, null, null,
compactRangeOptions);
} catch (RocksDBException e) {
result = false;
scheduleReloadRocksdb(e);
- LOGGER.error("manualCompaction Failed. {}, {}", this.dbPath,
getStatusError(e));
+ LOGGER.error("ManualCompaction failed, dbPath={}, error={}",
this.dbPath, getStatusError(e));
} finally {
release();
- LOGGER.info("manualCompaction End. {}, rt: {}(ms), result: {}",
this.dbPath, System.currentTimeMillis() - s1, result);
+ long after = getEstimateNumKeys();
+ long elapsed = System.currentTimeMillis() - startMs;
+ String ratio = before > 0 ? String.format("%.1f", (1.0 - (double)
after / before) * 100) : "0.0";
+ LOGGER.info("ManualCompaction finished, dbPath={}, elapsed={}ms,
success={}, before={}, after={}, reduced={}%",
+ this.dbPath, elapsed, result, before, after, ratio);
}
}
- protected void manualCompaction(long minPhyOffset, final
CompactRangeOptions compactRangeOptions) {
+ private long getEstimateNumKeys() {
+ try {
+ return this.db.getLongProperty(this.defaultCFHandle,
"rocksdb.estimate-num-keys");
+ } catch (RocksDBException e) {
+ return -1L;
+ }
+ }
+
+ protected void manualCompaction(final CompactRangeOptions
compactRangeOptions) {
this.manualCompactionThread.submit(new Runnable() {
@Override
public void run() {
diff --git a/docs/en/Native_RocksDB.md b/docs/en/Native_RocksDB.md
new file mode 100644
index 0000000000..f0d43a8137
--- /dev/null
+++ b/docs/en/Native_RocksDB.md
@@ -0,0 +1,290 @@
+# Native RocksDB ConsumeQueue Compaction Filter
+
+## Background
+
+RocketMQ previously depended on a custom-forked RocksDB Java binding published
as `org.apache.rocketmq:rocketmq-rocksdb:1.0.6`. This fork was maintained in
the `apache/rocketmq-externals` repository and was essentially a republished
copy of `org.rocksdb:rocksdbjni` with exactly **one** additional class:
+
+- `org.rocksdb.RemoveConsumeQueueCompactionFilter` — a RocksDB compaction
filter that removes stale consume queue entries during compaction. Its C++
implementation and JNI glue lived in the forked C++ source tree under
`utilities/compaction_filters/remove_consumequeue_compactionfilter.*` and
`java/rocksjni/remove_consumequeue_compactionfilterjni.cc`.
+
+All other RocketMQ subsystems using RocksDB (Pop consumption state, config
storage, index storage, timer storage, transaction half-message storage) used
only standard RocksDB Java APIs and had no dependency on the fork's custom code.
+
+## Problem
+
+Maintaining a fork of RocksDB's Java bindings has several costs:
+
+1. **Upgrade friction** — every RocksDB upstream release requires rebuilding
the entire fork to pick up the new native library and Java API
+2. **Native build complexity** — the fork bundles a full C++ build pipeline
for multiple platforms (Linux glibc/musl, macOS, Windows)
+3. **Dependency duplication** — the `rocksdb/` module in the RocketMQ source
tree duplicates ~190 classes that are identical to upstream `rocksdbjni`
+4. **License ambiguity** — the fork republishes Facebook's RocksDB code under
the Apache group
+
+## Solution
+
+Replace `rocketmq-rocksdb` with the official `org.rocksdb:rocksdbjni:8.4.4`
and move the single custom compaction filter into a standalone native shim.
+
+### Why a native shim is needed
+
+The official `rocksdbjni` provides a
`ColumnFamilyOptions.setCompactionFilter(AbstractCompactionFilter)` method, but
its `AbstractCompactionFilter` Java class requires a native handle (raw
`rocksdb::CompactionFilter*` pointer) passed to its constructor. The Java
`filter()` method callback goes through a C++ trampoline that RocksDB's JNI
layer manages internally — you can only subclass it from within the same JNI
compilation unit.
+
+To implement a custom compaction filter outside the `rocksdbjni` build, we
create a standalone C++ shared library that:
+- Directly subclasses `rocksdb::CompactionFilter` in C++
+- Exposes JNI methods to create filter instances and update the `minPhyOffset`
threshold
+- Returns the raw `CompactionFilter*` pointer as a `jlong` to Java
+
+### Architecture
+
+```
+┌──────────────────────────────────────────────────────┐
+│ ConsumeQueueRocksDBStorage (Java) │
+│ - CqCompactionFilterJni.createAndSetFilter(cqCfOpts)│
+│ - CqCompactionFilterJni.setMinPhyOffset(offset) │
+└──────────────────┬───────────────────────────────────┘
+ │
+ ▼
+┌──────────────────────────────────────────────────────┐
+│ CqCompactionFilterJni.java │
+│ - Extracts libcq_compaction_filter.so to the same │
+│ temp dir as the already-loaded rocksdbjni .so │
+│ - Uses NativeCqCompactionFilter wrapper with │
+│ disOwnNativeHandle() + public setCompactionFilter │
+│ - Calls native createNativeFilter0() → raw pointer │
+└──────────────────┬───────────────────────────────────┘
+ │
+ ▼
+┌──────────────────────────────────────────────────────┐
+│ libcq_compaction_filter.so (native shim) │
+│ │
+│ class CqCompactionFilter │
+│ : public rocksdb::CompactionFilter { ... } │
+│ │
+│ JNI: createNativeFilter0() → new CqCompactionFilter │
+│ JNI: setMinPhyOffset0(ptr, offset) │
+│ │
+│ NEEDED: librocksdbjni-linux64.so ($ORIGIN RPATH) │
+└──────────────────┬───────────────────────────────────┘
+ │
+ ▼
+┌──────────────────────────────────────────────────────┐
+│ librocksdbjni-linux64.so (official rocksdbjni) │
+│ - All RocksDB C++ classes (CompactionFilter, etc.) │
+│ - JNI glue for all Java↔C++ bindings │
+│ - Compiled with -fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0│
+└──────────────────────────────────────────────────────┘
+```
+
+### Key design decisions
+
+**1. Direct C++ subclassing with explicit linking**
+
+The shim directly subclasses `rocksdb::CompactionFilter` in C++ and is
compiled with matching ABI flags (`-fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0`) to
match how `librocksdbjni` was built. It is explicitly linked against
`librocksdbjni-linux64.so` (extracted from the `rocksdbjni` jar) with `$ORIGIN`
RPATH so the dynamic linker resolves symbols from the same directory.
+
+This replaced an earlier dlopen/RTLD_GLOBAL approach that caused C++ `double
free` crashes — loading the same `.so` twice (once via JVM's `RTLD_LOCAL` and
once via `RTLD_GLOBAL`) creates conflicting C++ global state (memory
allocators, static singletons, vtables).
+
+**2. Raw pointer as jlong, wrapped with disOwnNativeHandle()**
+
+The native shim creates `new CqCompactionFilter()` and returns the raw C++
pointer as a `jlong`. A thin Java wrapper `NativeCqCompactionFilter extends
AbstractCompactionFilter<Slice>` passes this pointer to the protected
`AbstractCompactionFilter(long)` constructor, then calls `disOwnNativeHandle()`
so that `close()` does not free the native memory. This is critical because
`AbstractRocksDBStorage.shutdown()` closes `ColumnFamilyOptions` (step 2)
before closing the DB (step 4) — without [...]
+
+**3. Shared temp directory for .so resolution**
+
+At runtime, `CqCompactionFilterJni` loads `librocksdbjni-linux64.so` from the
rocksdbjni JAR first (via `System.loadLibrary` or extraction to a temp dir),
then extracts `libcq_compaction_filter.so` to the same temp directory. This
ensures the `$ORIGIN` RPATH in the shim correctly resolves its `NEEDED`
dependency on `librocksdbjni-linux64.so`. The rocksdbjni native library is NOT
bundled in the RocketMQ repository — it is sourced from the
`org.rocksdb:rocksdbjni:8.4.4` JAR at runtime.
+
+**4. Thread-safe minPhyOffset with std::atomic**
+
+The `CqCompactionFilter` uses `std::atomic<int64_t>` with
`memory_order_relaxed` for `min_phy_offset_`. This is sufficient because there
is a single writer (Java main thread via JNI) and one reader (compaction
background thread), and eventual consistency is acceptable — a slightly stale
threshold only means a few extra entries survive one compaction cycle. This
replaces the earlier `pthread_mutex` approach, eliminating per-entry
lock/unlock overhead during full compaction over hundreds o [...]
+
+## Changed files
+
+| File | Change |
+|------|--------|
+| `pom.xml` | `rocksdb.version` → `rocksdbjni.version=8.4.4`; dependency
changed to `org.rocksdb:rocksdbjni` |
+| `common/pom.xml` | `rocketmq-rocksdb` → `org.rocksdb:rocksdbjni` |
+| `common/.../config/AbstractRocksDBStorage.java` |
`manualCompactionDefaultCfRange` enhanced with `estimateNumKeys` logging
(before/after key count, elapsed time, reduction ratio); `manualCompaction`
removed unused `minPhyOffset` parameter |
+| `store/.../rocksdb/ConsumeQueueCompactionFilterFactory.java` | **Deleted** —
replaced by native shim |
+| `store/.../rocksdb/ConsumeQueueRocksDBStorage.java` | Use
`CqCompactionFilterJni.createAndSetFilter()` instead of
`CompactionFilterFactory`; added `triggerCompactionSync()` and `countEntries()`
helpers |
+| `store/.../rocksdb/RocksDBOptionsFactory.java` | Remove
`setCompactionFilterFactory()` call from `createCQCFOptions()` |
+| `store/.../rocksdb/CqCompactionFilterJni.java` | **Rewritten** — uses raw
JNI pointer + `NativeCqCompactionFilter` wrapper via public API; added
platform-aware library name detection (macOS `.dylib` / Linux `.so` / Windows
`.dll`) |
+| `store/.../rocksdb/NativeCqCompactionFilter.java` | **New** — thin
`AbstractCompactionFilter<Slice>` wrapper with `disOwnNativeHandle()` |
+| `store/.../resources/native/cq_compaction_filter.cpp` | **Rewritten** —
direct C++ subclassing, explicit linking, `std::atomic` for thread safety |
+| `store/.../resources/native/libcq_compaction_filter.so` | **New** —
pre-compiled native library (Linux x86_64) |
+| `store/.../resources/native/libcq_compaction_filter.dylib` | **New** —
pre-compiled native library (macOS arm64) |
+| `store/.../resources/native/cq_compaction_filter.dll` | **New** —
pre-compiled native library (Windows x86_64, MSVC v14.29) |
+| `store/.../rocksdb/CqCompactionFilterJniTest.java` | **New** — integration
test for compaction filter |
+
+## Building the native shim
+
+Prerequisites: `g++` / `clang++`, RocksDB C++ headers matching `rocksdbjni`
version (8.4.4), JNI headers from your JDK.
+
+### Linux (x86_64)
+
+```bash
+# 1. Extract librocksdbjni from the rocksdbjni jar
+ROCKSDB_JAR=~/.m2/repository/org/rocksdb/rocksdbjni/8.4.4/rocksdbjni-8.4.4.jar
+unzip -j "$ROCKSDB_JAR" librocksdbjni-linux64.so -d /tmp/rocksdb-native/
+
+# 2. Download matching RocksDB headers
+wget https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz
+tar xzf v8.4.4.tar.gz rocksdb-8.4.4/include --strip-components=1
+
+# 3. Compile the shim with explicit linking
+export JAVA_HOME=/usr/lib/jvm/java-8 # or your JDK path
+g++ -shared -fPIC -O2 -std=c++17 -fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0 \
+ -I./include \
+ -I${JAVA_HOME}/include \
+ -I${JAVA_HOME}/include/linux \
+ -Wl,--no-undefined \
+ -Wl,-rpath,\$ORIGIN \
+ -L/tmp/rocksdb-native \
+ -l:librocksdbjni-linux64.so \
+ -o libcq_compaction_filter.so \
+ store/src/main/resources/native/cq_compaction_filter.cpp
+
+# 4. Verify NEEDED and RPATH
+readelf -d libcq_compaction_filter.so | grep -E "NEEDED|RPATH"
+# Should show: NEEDED librocksdbjni-linux64.so, RPATH $ORIGIN
+
+# 5. Replace the pre-built .so
+cp libcq_compaction_filter.so store/src/main/resources/native/
+```
+
+### macOS (arm64 / x86_64)
+
+On macOS, the rocksdbjni jar uses `.jnilib` extension (not `.so` or bare
names) and the native library names differ from Linux. Key gotchas:
+- Apple Silicon uses `librocksdbjni-osx-arm64.jnilib` (not
`librocksdbjni-osx-aarch64` as the filename pattern might suggest)
+- macOS `ld` does NOT support the `-l:` syntax used on Linux — pass the
`.jnilib` file directly
+- After linking, use `install_name_tool` to fix the absolute install name to
`@loader_path`, otherwise the shim fails to resolve the rocksdbjni dependency
at runtime
+- GitHub downloads may be blocked by corporate firewalls; use a mirror (e.g.
`ghproxy.net`) or a local RocksDB checkout for headers
+
+```bash
+# 1. Extract the macOS native library from rocksdbjni jar
+# The jar contains librocksdbjni-osx-arm64.jnilib (arm64) or
librocksdbjni-osx-x86_64.jnilib
+ROCKSDB_JAR=~/.m2/repository/org/rocksdb/rocksdbjni/8.4.4/rocksdbjni-8.4.4.jar
+mkdir -p /tmp/rocksdb-native
+
+# For Apple Silicon (arm64):
+unzip -j "$ROCKSDB_JAR" librocksdbjni-osx-arm64.jnilib -d /tmp/rocksdb-native/
+
+# For Intel Mac (x86_64):
+unzip -j "$ROCKSDB_JAR" librocksdbjni-osx-x86_64.jnilib -d /tmp/rocksdb-native/
+
+# 2. Download matching RocksDB headers
+# Use ghproxy.net mirror if github.com is blocked:
+curl -sL
"https://ghproxy.net/https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz"
-o /tmp/rocksdb-8.4.4.tar.gz
+tar xzf /tmp/rocksdb-8.4.4.tar.gz -C /tmp rocksdb-8.4.4/include
--strip-components=1
+# Or use a local RocksDB checkout if available:
+# ROCKSDB_INCLUDE=/path/to/rocksdb/include
+
+# 3. Compile the shim — pass .jnilib directly (macOS ld does NOT support -l:
syntax)
+export JAVA_HOME=$(/usr/libexec/java_home)
+ROCKSDB_INCLUDE=${ROCKSDB_INCLUDE:-./include} # adjust to your headers
location
+ROCKSDB_JNILIB=/tmp/rocksdb-native/librocksdbjni-osx-arm64.jnilib # or
-x86_64.jnilib
+clang++ -shared -fPIC -O2 -std=c++17 -fno-rtti \
+ -I"$ROCKSDB_INCLUDE" \
+ -I${JAVA_HOME}/include \
+ -I${JAVA_HOME}/include/darwin \
+ -Wl,-undefined,error \
+ "$ROCKSDB_JNILIB" \
+ -o /tmp/rocksdb-native/libcq_compaction_filter.dylib \
+ store/src/main/resources/native/cq_compaction_filter.cpp
+
+# 4. Fix the install_name to use @loader_path for runtime resolution
+# Without this, otool -L shows an absolute path to the build directory
+install_name_tool -change "$ROCKSDB_JNILIB" "@loader_path/$(basename
$ROCKSDB_JNILIB)" \
+ /tmp/rocksdb-native/libcq_compaction_filter.dylib
+
+# 5. Verify dependencies
+otool -L /tmp/rocksdb-native/libcq_compaction_filter.dylib
+# Should show @loader_path/librocksdbjni-osx-arm64.jnilib (or -x86_64.jnilib)
+
+# 6. Place the output
+cp /tmp/rocksdb-native/libcq_compaction_filter.dylib
store/src/main/resources/native/
+```
+
+### Windows (x86_64)
+
+**⚠ Must use MSVC — MinGW is NOT compatible**
+
+The official `librocksdbjni-win64.dll` is compiled with MSVC. MinGW-w64
produces incompatible C++ binaries (different vtable layout, name mangling,
exception handling). Attempting to link a MinGW-compiled shim against the
MSVC-compiled rocksdbjni DLL will cause undefined symbol errors at link time
and crashes at runtime.
+
+**Option A: Native MSVC build (required for Windows)**
+
+1. Install Visual Studio Build Tools 2019 (v14.29, matching the rocksdbjni
linker version 14.29.30159).
+2. Use the x64 Native Tools Command Prompt or set up the environment manually.
+
+```powershell
+# 1. Set up environment (run vcvarsall.bat first, or use the VS Dev Command
Prompt)
+set "VCTools=C:\Program Files\Microsoft Visual
Studio\2022\BuildTools\VC\Tools\MSVC\14.29.30133"
+set "SDK=C:\Program Files (x86)\Windows Kits\10"
+set "JAVA_HOME=C:\path\to\jdk8"
+
+# 2. Extract RocksDB headers
+curl -LO https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz
+tar xzf v8.4.4.tar.gz rocksdb-8.4.4/include --strip-components=1
+
+# 3. Compile with MSVC cl.exe (must use /GR- to disable RTTI, matching
rocksdbjni)
+cl.exe /LD /O2 /std:c++17 /GR- /EHsc /utf-8 ^
+ /I"%JAVA_HOME%\include" ^
+ /I"%JAVA_HOME%\include\win32" ^
+ /I"rocksdb-8.4.4\include" ^
+ /I"%VCTools%\include" ^
+ /I"%SDK%\Include\10.0.19041.0\ucrt" ^
+ /I"%SDK%\Include\10.0.19041.0\shared" ^
+ /I"%SDK%\Include\10.0.19041.0\um" ^
+ /Fecq_compaction_filter.dll ^
+ store\src\main\resources\native\cq_compaction_filter.cpp ^
+ /link /MACHINE:X64 ^
+ /LIBPATH:"%VCTools%\lib\x64" ^
+ /LIBPATH:"%SDK%\Lib\10.0.19041.0\ucrt\x64" ^
+ /LIBPATH:"%SDK%\Lib\10.0.19041.0\um\x64"
+
+# 4. Verify exports
+dumpbin /exports cq_compaction_filter.dll
+# Should show:
Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0
+#
Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0
+
+# 5. Place the output
+copy cq_compaction_filter.dll store\src\main\resources\native\
+```
+
+> **Note on Git Bash / MSYS2**: When running `cl.exe` from Git Bash, MSYS2's
automatic path conversion will corrupt `/LD`, `/O2` etc. into `C:/Program/LD`
etc. Use `MSYS2_ARG_CONV_EXCL='*'` to disable this, or run from `cmd.exe` /
PowerShell directly.
+
+**Windows build troubleshooting**
+
+| Problem | Cause | Solution |
+|---------|-------|----------|
+| `cl: warning D9024: cannot recognize source file type` | MSYS2/Git Bash
converts `/LD` to `C:/Program/LD` | Prefix command with
`MSYS2_ARG_CONV_EXCL='*'` or use `cmd.exe` |
+| `fatal error C1083: cannot open include file 'atomic'` | MSVC C++ headers
directory not in include path | Add `/I"%VCTools%\include"` (from VS Build
Tools) |
+| `LNK2019: unresolved external symbol ... Configurable::GetOption` |
`CompactionFilter` inherits from `Configurable`/`Customizable`, whose virtual
methods are not exported by `librocksdbjni-win64.dll` | Provide inline stub
implementations in your `.cpp` for all unexported pure virtual methods |
+| `LNK2019: unresolved external symbol ...
Status::Status(Code,SubCode,Slice,Slice,Severity)` |
`Status::NotSupported("msg")` calls the non-inline 5-parameter constructor
(defined in `status.cc`, not exported by DLL) | Use `return Status();` instead
of `return Status::NotSupported("...");` — `Status()` default constructor is
fully inline |
+
+**Option B: Run on WSL (recommended for development)**
+
+Run the entire RocketMQ build and test under WSL (Windows Subsystem for
Linux). This uses the native Linux toolchain and pre-built `.so` with zero code
changes:
+
+```bash
+# In WSL (Ubuntu)
+java -version # should show WSL JDK
+mvn test -pl store -Dtest=CqCompactionFilterJniTest -Djacoco.skip=true
+```
+
+## Platform support
+
+`CqCompactionFilterJni.java` automatically detects the OS and architecture at
runtime, selecting the correct library name and extension.
+
+| Platform | Library name | Architecture | Status |
+|----------|-------------|--------------|--------|
+| Linux (glibc) | `libcq_compaction_filter.so` | x86_64 | Pre-built |
+| Linux (glibc) | `libcq_compaction_filter_aarch64.so` | aarch64 | Pre-built |
+| macOS | `libcq_compaction_filter.dylib` | arm64 | Pre-built |
+| macOS | `libcq_compaction_filter.dylib` | x86_64 | Requires rebuild |
+| Windows | `cq_compaction_filter.dll` | x86_64 | Pre-built |
+
+## Limitations
+
+1. **Jacoco incompatibility** — The jacoco Java agent can cause native crashes
when combined with dynamically loaded native libraries. Unit tests should be
run with `-Djacoco.skip=true` when testing RocksDB functionality.
+
+2. **Global singleton filter** — `CqCompactionFilterJni` stores the native
filter pointer in a static `AtomicLong NATIVE_FILTER_PTR`. Only one filter
instance is tracked globally per JVM. If multiple `ConsumeQueueRocksDBStorage`
instances exist (e.g., in tests or multi-Broker processes), `setMinPhyOffset()`
always updates the last-created filter. Earlier instances lose their threshold
updates silently.
+
+3. **C++17 required** — The C++ source uses `std::atomic<int64_t>` which
requires a C++17-capable compiler. All modern compilers (GCC 7+, Clang 5+, MSVC
2017+) support this.
+
+4. **Shim depends on rocksdbjni native library at runtime** — The
`libcq_compaction_filter.so` has a `DT_NEEDED` entry for
`librocksdbjni-linux64.so` (~13 MB). The `CqCompactionFilterJni` class handles
this by extracting the shim to the same temp directory as the rocksdbjni native
library, so the `$ORIGIN` RPATH resolves correctly without requiring
`LD_LIBRARY_PATH`.
+
+5. **Windows requires MSVC** — `librocksdbjni-win64.dll` is compiled with MSVC
and does not export C++ base class symbols (`Configurable`/`Customizable`
vtable methods, `Status` constructors). A MinGW-compiled shim cannot link
against it. Must use the same MSVC version (v14.29 for rocksdbjni 8.4.4) and
provide inline stubs for unexported virtual methods.
diff --git a/pom.xml b/pom.xml
index 893e58b496..868faa57d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@
<opentelemetry-exporter-prometheus.version>1.47.0-alpha</opentelemetry-exporter-prometheus.version>
<jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
<s3.version>2.20.29</s3.version>
- <rocksdb.version>1.0.6</rocksdb.version>
+ <rocksdbjni.version>8.4.4</rocksdbjni.version>
<jackson-databind.version>2.13.4.2</jackson-databind.version>
<sofa-jraft.version>1.3.14</sofa-jraft.version>
@@ -761,9 +761,9 @@
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-rocksdb</artifactId>
- <version>${rocksdb.version}</version>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>${rocksdbjni.version}</version>
</dependency>
<dependency>
<groupId>io.github.aliyunmq</groupId>
diff --git a/store/pom.xml b/store/pom.xml
index 1600a007e0..70ecafe428 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -71,5 +71,9 @@
<groupId>io.github.aliyunmq</groupId>
<artifactId>rocketmq-shaded-slf4j-api-bridge</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
deleted file mode 100644
index f19fb9e203..0000000000
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.rocksdb;
-
-import java.util.function.LongSupplier;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.rocksdb.AbstractCompactionFilter;
-import org.rocksdb.AbstractCompactionFilterFactory;
-import org.rocksdb.RemoveConsumeQueueCompactionFilter;
-
-public class ConsumeQueueCompactionFilterFactory extends
AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
- private final LongSupplier minPhyOffsetSupplier;
-
- public ConsumeQueueCompactionFilterFactory(final LongSupplier
minPhyOffsetSupplier) {
- this.minPhyOffsetSupplier = minPhyOffsetSupplier;
- }
-
- @Override
- public String name() {
- return "ConsumeQueueCompactionFilterFactory";
- }
-
- @Override
- public RemoveConsumeQueueCompactionFilter createCompactionFilter(final
AbstractCompactionFilter.Context context) {
- long minPhyOffset = this.minPhyOffsetSupplier.getAsLong();
- LOGGER.info("manualCompaction minPhyOffset: {}, isFull: {}, isManual:
{}",
- minPhyOffset, context.isFullCompaction(),
context.isManualCompaction());
- return new RemoveConsumeQueueCompactionFilter(minPhyOffset);
- }
-}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index 4392283c67..925d8f91c7 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -21,10 +21,14 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.MessageStore;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -33,13 +37,13 @@ import org.rocksdb.WriteBatch;
public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+
public static final byte[] OFFSET_COLUMN_FAMILY =
"offset".getBytes(StandardCharsets.UTF_8);
private final MessageStore messageStore;
private volatile ColumnFamilyHandle offsetCFHandle;
- private ConsumeQueueCompactionFilterFactory compactionFilterFactory;
-
public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final
String dbPath) {
super(dbPath);
this.messageStore = messageStore;
@@ -67,20 +71,27 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
final List<ColumnFamilyDescriptor> cfDescriptors = new
ArrayList<>();
- this.compactionFilterFactory = new
ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset);
-
- ColumnFamilyOptions cqCfOptions =
RocksDBOptionsFactory.createCQCFOptions(this.messageStore,
this.compactionFilterFactory);
+ ColumnFamilyOptions cqCfOptions =
RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
this.cfOptions.add(cqCfOptions);
cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions));
ColumnFamilyOptions offsetCfOptions =
RocksDBOptionsFactory.createOffsetCFOptions();
this.cfOptions.add(offsetCfOptions);
cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY,
offsetCfOptions));
+
+ if (CqCompactionFilterJni.isLoaded()) {
+ CqCompactionFilterJni.createAndSetFilter(cqCfOptions);
+
CqCompactionFilterJni.setMinPhyOffset(messageStore.getMinPhyOffset());
+ log.info("CqCompactionFilter created and set, minPhyOffset:
{}", messageStore.getMinPhyOffset());
+ } else {
+ log.warn("CqCompactionFilterJni native library not loaded,
compaction filter will not be installed");
+ }
+
open(cfDescriptors);
this.defaultCFHandle = cfHandles.get(0);
this.offsetCFHandle = cfHandles.get(1);
} catch (final Exception e) {
- LOGGER.error("postLoad Failed. {}", this.dbPath, e);
+ log.error("postLoad Failed. {}", this.dbPath, e);
return false;
}
return true;
@@ -91,11 +102,6 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
if (this.offsetCFHandle != null) {
this.offsetCFHandle.close();
}
-
- if (this.compactionFilterFactory != null) {
- this.compactionFilterFactory.close();
- }
-
}
public byte[] getCQ(final byte[] keyBytes) throws RocksDBException {
@@ -116,10 +122,13 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
}
public void manualCompaction(final long minPhyOffset) {
+ if (CqCompactionFilterJni.isLoaded()) {
+ CqCompactionFilterJni.setMinPhyOffset(minPhyOffset);
+ }
try {
- manualCompaction(minPhyOffset, this.compactRangeOptions);
+ super.manualCompaction(this.compactRangeOptions);
} catch (Exception e) {
- LOGGER.error("manualCompaction Failed. minPhyOffset: {}",
minPhyOffset, e);
+ log.error("manualCompaction Failed. minPhyOffset: {}",
minPhyOffset, e);
}
}
@@ -130,4 +139,41 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
public ColumnFamilyHandle getOffsetCFHandle() {
return this.offsetCFHandle;
}
+
+ /**
+ * Synchronously trigger compaction with an updated compaction filter
threshold.
+ * This method updates the native compaction filter's minPhyOffset and then
+ * performs a full compaction on the default column family.
+ */
+ public void triggerCompactionSync(long minPhyOffset) throws
RocksDBException {
+ if (CqCompactionFilterJni.isLoaded()) {
+ CqCompactionFilterJni.setMinPhyOffset(minPhyOffset);
+ }
+ db.compactRange(this.defaultCFHandle);
+ }
+
+ /**
+ * Flush all memtables to SST files.
+ */
+ public void flushAll() throws RocksDBException {
+ try (FlushOptions flushOpts = new FlushOptions()) {
+ flushOpts.setWaitForFlush(true);
+ flush(flushOpts);
+ }
+ }
+
+ /**
+ * Count all entries in the default column family by iterating. O(N), use
only in tests.
+ */
+ public long countEntries() {
+ long count = 0;
+ try (RocksIterator iter = db.newIterator(this.defaultCFHandle)) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ count++;
+ iter.next();
+ }
+ }
+ return count;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
new file mode 100644
index 0000000000..69d74e3364
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.rocksdb.ColumnFamilyOptions;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class CqCompactionFilterJni {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+
+ private static final AtomicLong NATIVE_FILTER_PTR = new AtomicLong(0);
+ private static volatile boolean loaded = false;
+
+ /** Platform-specific shim library name and extension. */
+ private static final String SHIM_LIB_NAME;
+ private static final String SHIM_LIB_EXTENSION;
+ private static final String ROCKSDB_JNI_LIB_NAME;
+
+ static {
+ String os = System.getProperty("os.name").toLowerCase();
+ String arch = System.getProperty("os.arch");
+ if (os.contains("mac") || os.contains("darwin") || os.contains("osx"))
{
+ SHIM_LIB_NAME = "libcq_compaction_filter.dylib";
+ SHIM_LIB_EXTENSION = ".dylib";
+ ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") ||
arch.contains("arm")
+ ? "librocksdbjni-osx-aarch64"
+ : "librocksdbjni-osx-x86_64";
+ } else if (os.contains("win")) {
+ SHIM_LIB_NAME = "cq_compaction_filter.dll";
+ SHIM_LIB_EXTENSION = ".dll";
+ ROCKSDB_JNI_LIB_NAME = "librocksdbjni-win64.dll";
+ } else {
+ SHIM_LIB_NAME = arch.contains("aarch") || arch.contains("arm")
+ ? "libcq_compaction_filter_aarch64.so"
+ : "libcq_compaction_filter.so";
+ SHIM_LIB_EXTENSION = ".so";
+ ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") ||
arch.contains("arm")
+ ? "librocksdbjni-linux-aarch64.so"
+ : "librocksdbjni-linux64.so";
+ }
+ }
+
+ static {
+ loadNativeShim();
+ }
+
+ private static synchronized void loadNativeShim() {
+ if (loaded) {
+ return;
+ }
+
+ // Preload RocksDB's native library so that linked symbols are
available
+ // when our compaction filter shim is loaded.
+ String rocksdbDir = ensureRocksDBNativeLoaded();
+
+ String libName = SHIM_LIB_NAME;
+ try (InputStream is = CqCompactionFilterJni.class
+ .getClassLoader().getResourceAsStream("native/" + libName)) {
+ if (is == null) {
+ log.error("[CqCompactionFilterJni] Native library '{}' not
found on classpath", libName);
+ return;
+ }
+ File tempLib;
+ if (rocksdbDir != null) {
+ // Extract our shim to the same temp directory as the RocksDB
JNI library,
+ // so that the DT_NEEDED / LC_LOAD_DYLIB dependency can be
resolved.
+ tempLib = new File(rocksdbDir, libName);
+ } else {
+ // RocksDB was loaded from java.library.path; our shim can go
anywhere.
+ tempLib = File.createTempFile("cq_compaction_filter_",
SHIM_LIB_EXTENSION);
+ }
+ Files.copy(is, tempLib.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ tempLib.deleteOnExit();
+ System.load(tempLib.getAbsolutePath());
+ loaded = true;
+ log.info("[CqCompactionFilterJni] Native library loaded from
classpath: {}", tempLib.getAbsolutePath());
+ } catch (IOException e) {
+ log.error("[CqCompactionFilterJni] Failed to load native shim", e);
+ }
+ }
+
+ /**
+ * Returns whether the native compaction filter shim was successfully
loaded.
+ */
+ public static boolean isLoaded() {
+ return loaded;
+ }
+
+ /**
+ * Locates and loads the RocksDB native JNI library, returning the
temporary
+ * directory in which it was extracted (or null if loaded from
java.library.path).
+ * <p>
+ * This method deliberately uses {@code System.loadLibrary("rocksdbjni")}
+ * rather than {@code RocksDB.loadLibrary()} for the following reasons:
+ * <ol>
+ * <li><b>Avoid unnecessary side effects</b> — {@code
RocksDB.loadLibrary()}
+ * iterates over all compression types (snappy, lz4, zstd, bzip2, etc.)
+ * and attempts to load each one. Those libraries are not needed by
this
+ * compaction filter, and the resulting {@code UnsatisfiedLinkError}s
slow
+ * down startup and pollute logs.</li>
+ * <li><b>Control the temp directory location</b> — The caller needs to
know
+ * the directory where the native JNI library was extracted so that
+ * {@code libcq_compaction_filter.so} can be placed alongside it. This
is
+ * required for the dynamic linker to resolve the {@code DT_NEEDED}
+ * dependency of the custom shim. {@code RocksDB.loadLibrary()}
extracts
+ * to an internal temp directory that is not exposed to callers.</li>
+ * <li><b>Avoid class-loading coupling</b> — {@code RocksDB.loadLibrary()}
+ * triggers the full initialization chain of the rocksdbjni Java
bindings
+ * (including {@code CompressionType.values()} iteration and a
singleton
+ * {@code NativeLibraryLoader} state machine). Loading the custom shim
+ * must complete before any RocksDB Java classes are exercised, to
avoid
+ * native symbol resolution race conditions.</li>
+ * </ol>
+ *
+ * @return the absolute path of the temporary directory containing the
+ * extracted RocksDB JNI library, or null if the library was loaded
+ * from {@code java.library.path} (in which case no temp directory
+ * is needed for the shim).
+ */
+ private static String ensureRocksDBNativeLoaded() {
+ // Try System.loadLibrary first (works if on java.library.path)
+ try {
+ System.loadLibrary("rocksdbjni");
+ // No temp dir needed since it's on java.library.path
+ return null;
+ } catch (UnsatisfiedLinkError ignored) {
+ // Not on java.library.path, try from JAR
+ }
+
+ // Determine the platform-specific JNI library name from RocksDB's
Environment
+ String jniLibName;
+ try {
+ jniLibName =
org.rocksdb.util.Environment.getJniLibraryFileName("rocksdb");
+ } catch (Exception e) {
+ jniLibName = ROCKSDB_JNI_LIB_NAME;
+ }
+
+ try (InputStream is =
CqCompactionFilterJni.class.getClassLoader().getResourceAsStream(jniLibName)) {
+ if (is == null) {
+ log.error("[CqCompactionFilterJni] RocksDB native library '{}'
not found on classpath", jniLibName);
+ return null;
+ }
+ // Create a temp directory and extract the library there.
+ // Our shim will be placed in the same directory so the DT_NEEDED
+ // dependency resolves correctly.
+ File tempDir =
Files.createTempDirectory("rocksdb-native").toFile();
+ tempDir.deleteOnExit();
+ File tempLib = new File(tempDir, jniLibName);
+ Files.copy(is, tempLib.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ tempLib.deleteOnExit();
+ System.load(tempLib.getAbsolutePath());
+ return tempDir.getAbsolutePath();
+ } catch (IOException e) {
+ log.error("[CqCompactionFilterJni] Failed to extract RocksDB
native library", e);
+ return null;
+ }
+ }
+
+ /**
+ * Create a native CqCompactionFilter instance.
+ * Returns the raw C++ pointer as a jlong.
+ */
+ public static native long createNativeFilter0();
+
+ /**
+ * Update the minPhyOffset threshold on an existing native filter.
+ */
+ public static native void setMinPhyOffset0(long filterPtr, long
minPhyOffset);
+
+ /**
+ * Set the native compaction filter on the ColumnFamilyOptions via the
+ * public {@code setCompactionFilter} API.
+ * <p>
+ * The wrapper uses {@code disOwnNativeHandle()} so that closing the
+ * ColumnFamilyOptions does not free the native filter — this prevents
+ * use-after-free when AbstractRocksDBStorage closes options before the DB.
+ */
+ public static void setNativeFilter(ColumnFamilyOptions options, long
filterPtr) {
+ NativeCqCompactionFilter filter = new
NativeCqCompactionFilter(filterPtr);
+ options.setCompactionFilter(filter);
+ }
+
+ /**
+ * Create the native filter and set it on the ColumnFamilyOptions.
+ * Returns the native pointer for later threshold updates.
+ */
+ @SuppressWarnings("UnusedReturnValue")
+ public static long createAndSetFilter(ColumnFamilyOptions options) {
+ long ptr = createNativeFilter0();
+ NATIVE_FILTER_PTR.set(ptr);
+ setNativeFilter(options, ptr);
+ return ptr;
+ }
+
+ /**
+ * Update the minPhyOffset on the current native filter.
+ */
+ public static void setMinPhyOffset(long minPhyOffset) {
+ long ptr = NATIVE_FILTER_PTR.get();
+ if (ptr != 0) {
+ setMinPhyOffset0(ptr, minPhyOffset);
+ log.info("CqCompactionFilter setMinPhyOffset={}", minPhyOffset);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
new file mode 100644
index 0000000000..6a3101c261
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.Slice;
+
+/**
+ * Thin Java wrapper around a native CqCompactionFilter C++ pointer.
+ * <p>
+ * The native filter is allocated by {@link
CqCompactionFilterJni#createNativeFilter0()}
+ * and its lifetime is managed externally (it lives for the entire JVM
session).
+ * {@link #disOwnNativeHandle()} is called so that {@code close()} does not
+ * free the native memory — this is critical because {@code
AbstractRocksDBStorage}
+ * closes {@code ColumnFamilyOptions} (which closes this filter) before closing
+ * the DB, while background compaction threads may still reference the filter.
+ */
+class NativeCqCompactionFilter extends AbstractCompactionFilter<Slice> {
+
+ NativeCqCompactionFilter(long nativeHandle) {
+ super(nativeHandle);
+ disOwnNativeHandle();
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index b74cf8c85d..37eec67d35 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -41,8 +41,7 @@ import org.rocksdb.util.SizeUnit;
public class RocksDBOptionsFactory {
- public static ColumnFamilyOptions createCQCFOptions(final MessageStore
messageStore,
- ConsumeQueueCompactionFilterFactory
consumeQueueCompactionFilterFactory) {
+ public static ColumnFamilyOptions createCQCFOptions(final MessageStore
messageStore) {
BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig().
setFormatVersion(5).
setIndexType(IndexType.kBinarySearch).
@@ -93,7 +92,6 @@ public class RocksDBOptionsFactory {
setTargetFileSizeBase(256 * SizeUnit.MB).
setTargetFileSizeMultiplier(2).
setMergeOperator(new StringAppendOperator()).
-
setCompactionFilterFactory(consumeQueueCompactionFilterFactory).
setReportBgIoStats(true).
setOptimizeFiltersForHits(true);
}
diff --git a/store/src/main/resources/native/cq_compaction_filter.cpp
b/store/src/main/resources/native/cq_compaction_filter.cpp
new file mode 100644
index 0000000000..1d0bd84cd9
--- /dev/null
+++ b/store/src/main/resources/native/cq_compaction_filter.cpp
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Native compaction filter for ConsumeQueue entries.
+ *
+ * Subclass rocksdb::CompactionFilter directly, create instances in C++,
+ * and pass the raw C++ pointer as a jlong to Java. Java's
+ * AbstractCompactionFilter(nativeHandle) wraps it seamlessly.
+ *
+ * All rocksdb symbols are declared weak so they resolve at runtime to the
+ * symbols already loaded by the JVM's ClassLoader.
+ */
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/slice.h"
+
+/* ------------------------------------------------------------------ */
+/* Windows stub implementations */
+/* */
+/* On Linux/macOS, ELF/Mach-O shared libraries export all symbols by */
+/* default, so the shim resolves inherited virtual methods from */
+/* librocksdbjni at link time. On Windows, DLLs only export symbols */
+/* marked __declspec(dllexport) — rocksdbjni only exports JNI entry */
+/* points, not internal C++ class methods. We must provide stub */
+/* implementations for the Configurable/Customizable virtual methods */
+/* that appear in CompactionFilter's vtable. These stubs are never */
+/* called at runtime (RocksDB only invokes Filter() and Name() on */
+/* compaction filters), but the linker needs addresses for them. */
+/* ------------------------------------------------------------------ */
+
+#ifdef _WIN32
+
+#include "rocksdb/configurable.h"
+#include "rocksdb/customizable.h"
+#include <unordered_map>
+#include <unordered_set>
+
+namespace rocksdb {
+
+struct ConfigOptions;
+struct DBOptions;
+struct ColumnFamilyOptions;
+class OptionTypeInfo;
+
+// --- Configurable virtual methods (defined in options/configurable.cc) ---
+
+Status Configurable::GetOption(const ConfigOptions&, const std::string&,
+ std::string*) const {
+ return Status();
+}
+
+bool Configurable::AreEquivalent(const ConfigOptions&, const Configurable*,
+ std::string*) const {
+ return true;
+}
+
+Status Configurable::PrepareOptions(const ConfigOptions&) {
+ return Status();
+}
+
+Status Configurable::ValidateOptions(const DBOptions&,
+ const ColumnFamilyOptions&) const {
+ return Status();
+}
+
+const void* Configurable::GetOptionsPtr(const std::string&) const {
+ return nullptr;
+}
+
+Status Configurable::ParseStringOptions(const ConfigOptions&,
+ const std::string&) {
+ return Status();
+}
+
+Status Configurable::ConfigureOptions(
+ const ConfigOptions&,
+ const std::unordered_map<std::string, std::string>&,
+ std::unordered_map<std::string, std::string>*) {
+ return Status();
+}
+
+Status Configurable::ParseOption(const ConfigOptions&, const OptionTypeInfo&,
+ const std::string&, const std::string&,
+ void*) {
+ return Status();
+}
+
+bool Configurable::OptionsAreEqual(const ConfigOptions&, const OptionTypeInfo&,
+ const std::string&, const void*,
+ const void*, std::string*) const {
+ return true;
+}
+
+std::string Configurable::SerializeOptions(const ConfigOptions&,
+ const std::string&) const {
+ return "";
+}
+
+std::string Configurable::GetOptionName(const std::string& name) const {
+ return name;
+}
+
+// Non-virtual, but referenced by inline code paths
+void Configurable::RegisterOptions(const std::string&, void*,
+ const std::unordered_map<std::string, OptionTypeInfo>*) {}
+
+Status Configurable::ConfigureFromMap(
+ const ConfigOptions&,
+ const std::unordered_map<std::string, std::string>&) {
+ return Status();
+}
+
+Status Configurable::ConfigureFromMap(
+ const ConfigOptions&,
+ const std::unordered_map<std::string, std::string>&,
+ std::unordered_map<std::string, std::string>*) {
+ return Status();
+}
+
+Status Configurable::ConfigureOption(const ConfigOptions&, const std::string&,
+ const std::string&) {
+ return Status();
+}
+
+Status Configurable::ConfigureFromString(const ConfigOptions&,
+ const std::string&) {
+ return Status();
+}
+
+Status Configurable::GetOptionString(const ConfigOptions&,
+ std::string*) const {
+ return Status();
+}
+
+std::string Configurable::ToString(const ConfigOptions&,
+ const std::string&) const {
+ return "";
+}
+
+Status Configurable::GetOptionNames(const ConfigOptions&,
+ std::unordered_set<std::string>*) const {
+ return Status();
+}
+
+Status Configurable::GetOptionsMap(
+ const std::string&, const std::string&, std::string*,
+ std::unordered_map<std::string, std::string>*) {
+ return Status();
+}
+
+// --- Customizable virtual/override methods (defined in
options/customizable.cc) ---
+
+Status Customizable::GetOption(const ConfigOptions&, const std::string&,
+ std::string*) const {
+ return Status();
+}
+
+bool Customizable::AreEquivalent(const ConfigOptions&, const Configurable*,
+ std::string*) const {
+ return true;
+}
+
+std::string Customizable::GetOptionName(const std::string& name) const {
+ return name;
+}
+
+std::string Customizable::SerializeOptions(const ConfigOptions&,
+ const std::string&) const {
+ return "";
+}
+
+std::string Customizable::GenerateIndividualId() const {
+ return "stub";
+}
+
+Status Customizable::GetOptionsMap(
+ const ConfigOptions&, const Customizable*, const std::string&,
+ std::string*, std::unordered_map<std::string, std::string>*) {
+ return Status();
+}
+
+Status Customizable::ConfigureNewObject(
+ const ConfigOptions&, Customizable*,
+ const std::unordered_map<std::string, std::string>&) {
+ return Status();
+}
+
+// --- Status methods (defined in util/status.cc) ---
+
+Status::Status(Code _code, SubCode _subcode, const Slice& msg,
+ const Slice& msg2, Severity sev)
+ : code_(_code), subcode_(_subcode), sev_(sev),
+ retryable_(false), data_loss_(false), scope_(0) {}
+
+std::unique_ptr<const char[]> Status::CopyState(const char* s) {
+ if (s == nullptr) return nullptr;
+ const size_t n = std::strlen(s) + 1;
+ char* result = new char[n];
+ std::memcpy(result, s, n);
+ return std::unique_ptr<const char[]>(result);
+}
+
+std::string Status::ToString() const {
+ return "OK";
+}
+
+} // namespace rocksdb
+
+#endif // _WIN32
+
+/* ------------------------------------------------------------------ */
+/* Our concrete compaction filter */
+/* ------------------------------------------------------------------ */
+
+class CqCompactionFilter : public rocksdb::CompactionFilter {
+public:
+ const char* Name() const override {
+ return "ConsumeQueueCompactionFilter";
+ }
+
+ bool Filter(int /*level*/, const rocksdb::Slice& /*key*/,
+ const rocksdb::Slice& existing_value, std::string*
/*new_value*/,
+ bool* /*value_changed*/) const override {
+ static const int CQ_MIN_SIZE = 28;
+ if (existing_value.size() < static_cast<size_t>(CQ_MIN_SIZE)) {
+ return false;
+ }
+ const unsigned char* data =
+ reinterpret_cast<const unsigned char*>(existing_value.data());
+ int64_t phy_offset =
+ (static_cast<int64_t>(data[0]) << 56) |
+ (static_cast<int64_t>(data[1]) << 48) |
+ (static_cast<int64_t>(data[2]) << 40) |
+ (static_cast<int64_t>(data[3]) << 32) |
+ (static_cast<int64_t>(data[4]) << 24) |
+ (static_cast<int64_t>(data[5]) << 16) |
+ (static_cast<int64_t>(data[6]) << 8) |
+ (static_cast<int64_t>(data[7]));
+
+ int64_t min_offset = min_phy_offset_.load(std::memory_order_relaxed);
+ return phy_offset < min_offset;
+ }
+
+ void SetMinPhyOffset(int64_t offset) {
+ min_phy_offset_.store(offset, std::memory_order_relaxed);
+ }
+
+private:
+ std::atomic<int64_t> min_phy_offset_{0};
+};
+
+/* ------------------------------------------------------------------ */
+/* JNI bindings */
+/* ------------------------------------------------------------------ */
+
+#include <jni.h>
+
+extern "C" {
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0(
+ JNIEnv* env, jclass clazz) {
+ CqCompactionFilter* filter = new CqCompactionFilter();
+ return reinterpret_cast<jlong>(filter);
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0(
+ JNIEnv* env, jclass clazz, jlong filterPtr, jlong minPhyOffset) {
+ CqCompactionFilter* filter =
reinterpret_cast<CqCompactionFilter*>(filterPtr);
+ filter->SetMinPhyOffset(minPhyOffset);
+}
+
+} // extern "C"
diff --git a/store/src/main/resources/native/cq_compaction_filter.dll
b/store/src/main/resources/native/cq_compaction_filter.dll
new file mode 100755
index 0000000000..2dc74834f4
Binary files /dev/null and
b/store/src/main/resources/native/cq_compaction_filter.dll differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter.dylib
b/store/src/main/resources/native/libcq_compaction_filter.dylib
new file mode 100755
index 0000000000..58d6e6796a
Binary files /dev/null and
b/store/src/main/resources/native/libcq_compaction_filter.dylib differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter.so
b/store/src/main/resources/native/libcq_compaction_filter.so
new file mode 100755
index 0000000000..46dabe1880
Binary files /dev/null and
b/store/src/main/resources/native/libcq_compaction_filter.so differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter_aarch64.so
b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so
new file mode 100755
index 0000000000..b77869f063
Binary files /dev/null and
b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so differ
diff --git
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
new file mode 100644
index 0000000000..eead66b074
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.WriteBatch;
+
+public class CqCompactionFilterJniTest {
+
+ private static final int TOPIC_COUNT = 100;
+ private static final int BATCH_SIZE = 100_000;
+ private static final int MSG_SIZE = 1000;
+
+ private static final byte CTRL_1 = '\u0001';
+ private ConsumeQueueRocksDBStorage storage;
+
+ @Before
+ public void setUp() throws Exception {
+ Assume.assumeTrue("CqCompactionFilterJni native library must be
loaded", CqCompactionFilterJni.isLoaded());
+ String dbPath = Files.createTempDirectory("rocksdb-cq-compaction-" +
UUID.randomUUID()).toString();
+ MessageStore mockStore = Mockito.mock(MessageStore.class);
+ Mockito.when(mockStore.getMinPhyOffset()).thenReturn(0L);
+ Mockito.when(mockStore.getMessageStoreConfig()).thenReturn(new
MessageStoreConfig());
+ storage = new ConsumeQueueRocksDBStorage(mockStore, dbPath);
+ }
+
+ @After
+ public void tearDown() {
+ if (storage != null) {
+ storage.shutdown();
+ storage.destroy();
+ }
+ }
+
+ @Test
+ public void testCreateAndSetFilter() {
+ Assert.assertTrue("Native library should be loaded",
CqCompactionFilterJni.isLoaded());
+
+ long ptr = CqCompactionFilterJni.createNativeFilter0();
+ Assert.assertTrue("Native filter pointer should be non-zero", ptr !=
0);
+
+ CqCompactionFilterJni.setMinPhyOffset0(ptr, 1000);
+ CqCompactionFilterJni.setMinPhyOffset0(ptr, Long.MAX_VALUE);
+
+ try (ColumnFamilyOptions options = new ColumnFamilyOptions()) {
+ CqCompactionFilterJni.setNativeFilter(options, ptr);
+ }
+ }
+
+ @Test
+ public void testCompactionFilter_small() throws Exception {
+ runCompactionTest(1_000_000);
+ }
+
+ @Test
+ public void testCompactionFilter_large() throws Exception {
+ runCompactionTest(10_000_000);
+ }
+
+ private void runCompactionTest(int totalEntries) throws Exception {
+ long start = System.currentTimeMillis();
+ boolean result = storage.start();
+ if (!result) {
+ System.err.println("storage.start() returned false. Check ERROR
logs above for details.");
+ }
+ Assert.assertTrue("ConsumeQueueRocksDBStorage failed to start",
result);
+ log("Startup took %d ms", System.currentTimeMillis() - start);
+
+ // Phase 1: Write entries
+ start = System.currentTimeMillis();
+ writeEntries(totalEntries);
+ long writeTime = System.currentTimeMillis() - start;
+ log("Wrote %d entries in %d ms (%.0f entries/sec)", totalEntries,
writeTime, totalEntries * 1000.0 / writeTime);
+
+ // Phase 2: Count entries before compaction
+ start = System.currentTimeMillis();
+ long countBefore = storage.countEntries();
+ long countTime = System.currentTimeMillis() - start;
+ log("Count before compaction: %d (took %d ms)", countBefore,
countTime);
+ Assert.assertEquals("Entry count should match total written",
totalEntries, countBefore);
+
+ // Flush memtables to SST files so compaction has something to process
+ start = System.currentTimeMillis();
+ storage.flushAll();
+ log("Flush took %d ms", System.currentTimeMillis() - start);
+
+ // Phase 3: Set minPhyOffset at midpoint and trigger compaction
+ long minPhyOffset = (long) (totalEntries / 2.0) * MSG_SIZE;
+ start = System.currentTimeMillis();
+ storage.triggerCompactionSync(minPhyOffset);
+ long compactTime = System.currentTimeMillis() - start;
+ log("Compaction with minPhyOffset=%d took %d ms", minPhyOffset,
compactTime);
+
+ // Phase 4: Count entries after compaction
+ start = System.currentTimeMillis();
+ long countAfter = storage.countEntries();
+ countTime = System.currentTimeMillis() - start;
+ log("Count after compaction: %d (took %d ms)", countAfter, countTime);
+
+ // Verify: approximately half the entries should remain
+ long expectedSurvivors = totalEntries - totalEntries / 2;
+ long tolerance = Math.max(expectedSurvivors / 100, 100);
+ Assert.assertTrue(
+ "Expected ~" + expectedSurvivors + " entries after compaction, but
got " + countAfter,
+ countAfter >= expectedSurvivors - tolerance && countAfter <=
expectedSurvivors + tolerance
+ );
+
+ log("Test passed: %d -> %d entries (expected ~%d)", totalEntries,
countAfter, expectedSurvivors);
+ }
+
+ private void writeEntries(int totalEntries) throws Exception {
+ int entriesPerTopic = totalEntries / TOPIC_COUNT;
+
+ for (int t = 0; t < TOPIC_COUNT; t++) {
+ String topic = "test-topic-" + t;
+ byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+ int queueId = 0;
+
+ try (WriteBatch batch = new WriteBatch()) {
+ for (int i = 0; i < entriesPerTopic; i++) {
+ int globalIndex = t * entriesPerTopic + i;
+
+ // Key:
[topic_len:4][CTRL_1][topic][CTRL_1][queue_id:4][CTRL_1][cq_offset:8]
+ int keyLen = Integer.BYTES + 1 + topicBytes.length + 1 +
Integer.BYTES + 1 + Long.BYTES;
+ ByteBuffer keyBB = ByteBuffer.allocate(keyLen);
+ keyBB.putInt(topicBytes.length)
+ .put(CTRL_1)
+ .put(topicBytes)
+ .put(CTRL_1)
+ .putInt(queueId)
+ .put(CTRL_1)
+ .putLong(i);
+
+ // Value:
[phy_offset:8][msg_size:4][tags_code:8][store_timestamp:8] (28 bytes)
+ long phyOffset = (long) globalIndex * MSG_SIZE;
+ ByteBuffer valueBB = ByteBuffer.allocate(28);
+ valueBB.putLong(phyOffset)
+ .putInt(MSG_SIZE)
+ .putLong(0)
+ .putLong(System.currentTimeMillis());
+
+ batch.put(storage.getDefaultCFHandle(), keyBB.array(),
valueBB.array());
+
+ if ((i + 1) % BATCH_SIZE == 0) {
+ storage.batchPut(batch);
+ }
+ }
+ if (entriesPerTopic % BATCH_SIZE != 0) {
+ storage.batchPut(batch);
+ }
+ }
+ }
+ }
+
+ private void log(String format, Object... args) {
+ System.out.printf("[CqCompactionFilterJniTest] " + format + "%n",
args);
+ }
+}