GitHub user shach1990 edited a discussion: 大量事务消息出现反复回查的情况
**请教各位是否遇到过类似的问题,初步怀疑是否时因为 transientStorePoolEnable 为
true导致,但本地无法复现,但线上环境100%出现,有没有解决方案呢?**
### 背景
事务消息基本上存在反复回查的情况,应用的`TransactionListener`的`executeLocalTransaction`和`checkLocalTransaction`方法都确认返回`COMMIT_MESSAGE`,大部分消息回查1次成功,还有一部分回查几次才成功,还有小部分回查15次都不正常就丢失了。
### 环境
集群3主3从,版本V4.8.0。
### 现象
通过Arthas查到回查调用(`org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest`)返回`Find
prepared transaction message failed`错误日志如下:
```
method=org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest
location=AtExit
ts=2023-06-27 10:26:48; [cost=0.060751ms] result=@ArrayList[
@Object[][
@DefaultChannelHandlerContext[io.netty.channel.DefaultChannelHandlerContext@6bb6e1a1],
@RemotingCommand[RemotingCommand [code=37, language=JAVA, version=373,
opaque=31756804, flag(B)=10, remark=null,
extFields={producerGroup=warehouseProducer, commitLogOffset=48902747609,
msgId=7F00000100016801B4148821A5ACB356, tranStateTableOffset=3210422,
commitOrRollback=8, transactionId=7F00000100016801B4148821A5ACB356,
fromTransactionCheck=true}, serializeTypeCurrentRPC=JSON]],
],
@RemotingCommand[
SERIALIZE_TYPE_PROPERTY=@String[rocketmq.serialize.type],
SERIALIZE_TYPE_ENV=@String[ROCKETMQ_SERIALIZE_TYPE],
REMOTING_VERSION_KEY=@String[rocketmq.remoting.version],
log=@Slf4jLogger[org.apache.rocketmq.logging.Slf4jLoggerFactory$Slf4jLogger@69060658],
RPC_TYPE=@Integer[0],
RPC_ONEWAY=@Integer[1],
CLASS_HASH_MAP=@HashMap[isEmpty=false;size=31],
CANONICAL_NAME_CACHE=@HashMap[isEmpty=false;size=5],
NULLABLE_FIELD_CACHE=@HashMap[isEmpty=false;size=8],
STRING_CANONICAL_NAME=@String[java.lang.String],
DOUBLE_CANONICAL_NAME_1=@String[java.lang.Double],
DOUBLE_CANONICAL_NAME_2=@String[double],
INTEGER_CANONICAL_NAME_1=@String[java.lang.Integer],
INTEGER_CANONICAL_NAME_2=@String[int],
LONG_CANONICAL_NAME_1=@String[java.lang.Long],
LONG_CANONICAL_NAME_2=@String[long],
BOOLEAN_CANONICAL_NAME_1=@String[java.lang.Boolean],
BOOLEAN_CANONICAL_NAME_2=@String[boolean],
configVersion=@Integer[373],
requestId=@AtomicInteger[-1931087276],
serializeTypeConfigInThisServer=@SerializeType[JSON],
code=@Integer[1],
language=@LanguageCode[JAVA],
version=@Integer[373],
opaque=@Integer[-1931087279],
flag=@Integer[1],
remark=@String[Find prepared transaction message failed],
extFields=null,
customHeader=null,
serializeTypeCurrentRPC=@SerializeType[JSON],
body=null,
],
]
```
`store.log` 也大量出现`selectMappedBuffer request pos invalid`日志:
```
2023-06-23 13:40:05 WARN EndTransactionThread_3 - selectMappedBuffer request
pos invalid, request pos: 630412830, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:21 WARN EndTransactionThread_1 - selectMappedBuffer request
pos invalid, request pos: 631183337, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:21 WARN EndTransactionThread_6 - selectMappedBuffer request
pos invalid, request pos: 631185244, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:21 WARN EndTransactionThread_16 - selectMappedBuffer request
pos invalid, request pos: 631187159, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:34 INFO StoreStatsService - [STORETPS] put_tps
27.1639502716395 get_found_tps 154.8845115488451 get_miss_tps 340.0826584008266
get_transfered_tps 162.05046162050462
2023-06-23 13:40:34 INFO StoreStatsService - [PAGECACHERT] TotalPut 1630,
PutMessageDistributeTime [<=0ms]:1609 [0~10ms]:21 [10~50ms]:0 [50~100ms]:0
[100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0
[5~10s]:0 [10s~]:0
2023-06-23 13:40:35 WARN EndTransactionThread_18 - selectMappedBuffer request
pos invalid, request pos: 632039772, size: 4, fileFromOffset: 36507222016
```
查看源码得知,就是因为`org.apache.rocketmq.store.MappedFile#selectMappedBuffer(int,
int)`查询不到消息,报WARN日志`selectMappedBuffer request pos
invalid`,然后导致`org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#getHalfMessageByOffset`返回`Find
prepared transaction message failed`错误。
### Broker 配置
```
serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=rocketmq-cluster
flushDiskType=ASYNC_FLUSH
mapedFileSizeCommitLog=1073741824
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=true
flushConsumerOffsetInterval=5000
checkTransactionMessageEnable=false
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=88
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=true
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=false
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
mapedFileSizeConsumeQueue=300000
listenPort=23301
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=8
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=32
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=ip1;ip2;ip3
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=8
transactionTimeOut=6000
maxMessageSize=65536
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/app/rocketmq
queryMessageThreadPoolNums=16
destroyMapedFileInterval=120000
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=32
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=10
waitTimeMillsInSendQueue=5000
warmMapedFileEnable=false
endTransactionThreadPoolNums=24
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=ip1
brokerIP1=ip2
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
#haListenPort=23311
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/app/rocketmq/data
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/app/rocketmq/data/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=04
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=4
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=120
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8
traceTopicEnable=true
```
GitHub link: https://github.com/apache/rocketmq/discussions/6954
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]