This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 12774c64be [ISSUE #9779] fix DirectBuffer will cause error at 9+ JDK
version. (#9801)
12774c64be is described below
commit 12774c64bebf4df45701880b585f7e8c5daebe4e
Author: sinberCS <[email protected]>
AuthorDate: Mon Nov 24 18:00:38 2025 +0800
[ISSUE #9779] fix DirectBuffer will cause error at 9+ JDK version. (#9801)
* fix(DirectBuffer): issue#9779,fix the promblem of 'DirectBuffer will
cause error at 9+ JDK version'.
Change-Id: I4657ecc401046a3b0d29b466ee68845f45d34105
* fix(DirectBuffer): issue#9779,fix the promblem of 'DirectBuffer will
cause error at 9+ JDK version'.
Change-Id: Iff0880f694cccfa86b812b81260dfe09e4763fa9
---
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 7 ++++---
.../java/org/apache/rocketmq/store/TransientStorePool.java | 11 ++++++-----
.../org/apache/rocketmq/store/logfile/DefaultMappedFile.java | 10 ++++++----
3 files changed, 16 insertions(+), 12 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 29b09bc649..d6ea017218 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+
+import io.netty.util.internal.PlatformDependent;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
@@ -69,7 +71,6 @@ import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
-import sun.nio.ch.DirectBuffer;
/**
* Store all metadata downtime for recovery, data protection reliability
@@ -2433,7 +2434,7 @@ public class CommitLog implements Swappable {
private byte[] checkFileInPageCache(MappedFile mappedFile) {
long fileSize = mappedFile.getFileSize();
- final long address = ((DirectBuffer)
mappedFile.getMappedByteBuffer()).address();
+ final long address =
PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
int pageNums = (int) (fileSize + this.pageSize - 1) /
this.pageSize;
byte[] pageCacheRst = new byte[pageNums];
int mincore = LibC.INSTANCE.mincore(new Pointer(address), new
NativeLong(fileSize), pageCacheRst);
@@ -2509,7 +2510,7 @@ public class CommitLog implements Swappable {
log.error("setFileReadMode mappedFile is null");
return -1;
}
- final long address = ((DirectBuffer)
mappedFile.getMappedByteBuffer()).address();
+ final long address =
PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
int madvise = LibC.INSTANCE.madvise(new Pointer(address), new
NativeLong(mappedFile.getFileSize()), mode);
if (madvise != 0) {
log.error("setFileReadMode error fileName: {}, madvise: {},
mode:{}", mappedFile.getFileName(), madvise, mode);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index 0d42ee69e6..d9ad4f4ed1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -16,16 +16,17 @@
*/
package org.apache.rocketmq.store;
-import com.sun.jna.NativeLong;
-import com.sun.jna.Pointer;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
+
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import io.netty.util.internal.PlatformDependent;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.util.LibC;
-import sun.nio.ch.DirectBuffer;
public class TransientStorePool {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -48,7 +49,7 @@ public class TransientStorePool {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
- final long address = ((DirectBuffer) byteBuffer).address();
+ final long address =
PlatformDependent.directBufferAddress(byteBuffer);
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
@@ -58,7 +59,7 @@ public class TransientStorePool {
public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
- final long address = ((DirectBuffer) byteBuffer).address();
+ final long address =
PlatformDependent.directBufferAddress(byteBuffer);
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index b28b7a8aef..fbfffef3bc 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -37,6 +37,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+
+import io.netty.util.internal.PlatformDependent;
import org.apache.commons.lang3.SystemUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -57,7 +59,7 @@ import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC;
import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
+
public class DefaultMappedFile extends AbstractMappedFile {
public static final int OS_PAGE_SIZE = 1024 * 4;
@@ -914,7 +916,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
@Override
public void mlock() {
final long beginTime = System.currentTimeMillis();
- final long address = ((DirectBuffer)
(this.mappedByteBuffer)).address();
+ final long address =
PlatformDependent.directBufferAddress(this.mappedByteBuffer);
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new
NativeLong(this.fileSize));
@@ -930,7 +932,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
@Override
public void munlock() {
final long beginTime = System.currentTimeMillis();
- final long address = ((DirectBuffer)
(this.mappedByteBuffer)).address();
+ final long address =
PlatformDependent.directBufferAddress(this.mappedByteBuffer);
Pointer pointer = new Pointer(address);
int ret = LibC.INSTANCE.munlock(pointer, new
NativeLong(this.fileSize));
log.info("munlock {} {} {} ret = {} time consuming = {}", address,
this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
@@ -1049,7 +1051,7 @@ public class DefaultMappedFile extends AbstractMappedFile
{
return true;
}
try {
- long addr = ((DirectBuffer) mappedByteBuffer).address() + position;
+ long addr =
PlatformDependent.directBufferAddress(mappedByteBuffer) + position;
return (boolean) IS_LOADED_METHOD.invoke(mappedByteBuffer,
mappingAddr(addr), size, pageCount(size));
} catch (Exception e) {
log.info("invoke isLoaded0 of file {} error:",
file.getAbsolutePath(), e);