GitHub user chenzhixiu edited a discussion: 会不会出现commitlog文件写消息时候, maxBlank
小于8,导致写入异常
`public AppendMessageResult doAppend(final long fileFromOffset, final
ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch, PutMessageContext
putMessageContext) {
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
// Record ConsumeQueue information
Long queueOffset = messageExtBatch.getQueueOffset();
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
final long beginTimeMills =
CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
int sysFlag = messageExtBatch.getSysFlag();
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) ==
0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysFlag &
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
Supplier<String> msgIdSupplier = () -> {
int msgIdLen = storeHostLength + 8;
int batchCount = putMessageContext.getBatchSize();
long[] phyPosArray = putMessageContext.getPhyPos();
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(),
msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the
buffer
StringBuilder buffer = new StringBuilder(batchCount * msgIdLen
* 2 + batchCount - 1);
for (int i = 0; i < phyPosArray.length; i++) {
msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
String msgId = UtilAll.bytes2string(msgIdBuffer.array());
if (i != 0) {
buffer.append(',');
}
buffer.append(msgId);
}
return buffer.toString();
};
messagesByteBuff.mark();
int index = 0;
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank,
msgIdSupplier, messageExtBatch.getStoreTimestamp(),
beginQueueOffset,
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
int pos = msgPos + 20;
messagesByteBuff.putLong(pos, queueOffset);
pos += 8;
messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen -
msgLen);
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + bornHostLength;
// refresh store time stamp in lock
messagesByteBuff.putLong(pos,
messageExtBatch.getStoreTimestamp());
putMessageContext.getPhyPos()[index++] = wroteOffset +
totalMsgLen - msgLen;
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
messagesByteBuff.position(0);
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen,
msgIdSupplier,
messageExtBatch.getStoreTimestamp(), beginQueueOffset,
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
return result;
}
}`
GitHub link: https://github.com/apache/rocketmq/discussions/7043
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]