This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 01a51231b3 [ISSUE #9015] Sync SysFlag and message body inflation
status; allow omit of message body (#9016)
01a51231b3 is described below
commit 01a51231b3e8a0e72b2803d537bfbeb4f8e45719
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Dec 5 15:06:35 2024 +0800
[ISSUE #9015] Sync SysFlag and message body inflation status; allow omit of
message body (#9016)
---
.../client/impl/consumer/ProcessQueue.java | 12 ++++--
.../client/producer/ProduceAccumulator.java | 50 ++++++++++++++--------
.../trace/hook/SendMessageOpenTracingHookImpl.java | 6 +--
.../trace/hook/SendMessageTraceHookImpl.java | 3 +-
.../rocketmq/common/message/MessageDecoder.java | 4 +-
5 files changed, 50 insertions(+), 25 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 33e698b00c..bc1b5eff2f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -137,7 +137,7 @@ public class ProcessQueue {
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
- msgSize.addAndGet(msg.getBody().length);
+ msgSize.addAndGet(null == msg.getBody() ? 0 :
msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
@@ -198,7 +198,10 @@ public class ProcessQueue {
MessageExt prev =
msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
- msgSize.addAndGet(-msg.getBody().length);
+ long bodySize = null == msg.getBody() ? 0 :
msg.getBody().length;
+ if (bodySize > 0) {
+ msgSize.addAndGet(-bodySize);
+ }
}
}
if (msgCount.addAndGet(removedCnt) == 0) {
@@ -270,7 +273,10 @@ public class ProcessQueue {
msgSize.set(0);
} else {
for (MessageExt msg :
this.consumingMsgOrderlyTreeMap.values()) {
- msgSize.addAndGet(-msg.getBody().length);
+ int bodySize = null == msg.getBody() ? 0 :
msg.getBody().length;
+ if (bodySize > 0) {
+ msgSize.addAndGet(-bodySize);
+ }
}
}
this.consumingMsgOrderlyTreeMap.clear();
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
index 46dfcf71d2..809830e464 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -52,9 +52,9 @@ public class ProduceAccumulator {
private final Logger log =
LoggerFactory.getLogger(DefaultMQProducer.class);
private final GuardForSyncSendService guardThreadForSyncSend;
private final GuardForAsyncSendService guardThreadForAsyncSend;
- private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
- private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
- private AtomicLong currentlyHoldSize = new AtomicLong(0);
+ private final Map<AggregateKey, MessageAccumulation> syncSendBatchs = new
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+ private final Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+ private final AtomicLong currentlyHoldSize = new AtomicLong(0);
private final String instanceName;
public ProduceAccumulator(String instanceName) {
@@ -70,11 +70,13 @@ public class ProduceAccumulator {
serviceName = String.format("Client_%s_GuardForSyncSend",
clientInstanceName);
}
- @Override public String getServiceName() {
+ @Override
+ public String getServiceName() {
return serviceName;
}
- @Override public void run() {
+ @Override
+ public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
@@ -115,11 +117,13 @@ public class ProduceAccumulator {
serviceName = String.format("Client_%s_GuardForAsyncSend",
clientInstanceName);
}
- @Override public String getServiceName() {
+ @Override
+ public String getServiceName() {
return serviceName;
}
- @Override public void run() {
+ @Override
+ public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
@@ -276,7 +280,10 @@ public class ProduceAccumulator {
boolean tryAddMessage(Message message) {
synchronized (currentlyHoldSize) {
if (currentlyHoldSize.get() < totalHoldSize) {
- currentlyHoldSize.addAndGet(message.getBody().length);
+ int bodySize = null == message.getBody() ? 0 :
message.getBody().length;
+ if (bodySize > 0) {
+ currentlyHoldSize.addAndGet(bodySize);
+ }
return true;
} else {
return false;
@@ -305,7 +312,8 @@ public class ProduceAccumulator {
this.tag = tag;
}
- @Override public boolean equals(Object o) {
+ @Override
+ public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
@@ -314,7 +322,8 @@ public class ProduceAccumulator {
return waitStoreMsgOK == key.waitStoreMsgOK &&
topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag,
key.tag);
}
- @Override public int hashCode() {
+ @Override
+ public int hashCode() {
return Objects.hash(topic, mq, waitStoreMsgOK, tag);
}
}
@@ -324,7 +333,7 @@ public class ProduceAccumulator {
private LinkedList<Message> messages;
private LinkedList<SendCallback> sendCallbacks;
private Set<String> keys;
- private AtomicBoolean closed;
+ private final AtomicBoolean closed;
private SendResult[] sendResults;
private AggregateKey aggregateKey;
private AtomicInteger messagesSize;
@@ -351,8 +360,7 @@ public class ProduceAccumulator {
return false;
}
- public int add(
- Message msg) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
+ public int add(Message msg) throws InterruptedException,
MQBrokerException, RemotingException, MQClientException {
int ret = -1;
synchronized (this.closed) {
if (this.closed.get()) {
@@ -360,7 +368,10 @@ public class ProduceAccumulator {
}
ret = this.count++;
this.messages.add(msg);
- messagesSize.addAndGet(msg.getBody().length);
+ int bodySize = null == msg.getBody() ? 0 :
msg.getBody().length;
+ if (bodySize > 0) {
+ messagesSize.addAndGet(bodySize);
+ }
String msgKeys = msg.getKeys();
if (msgKeys != null) {
this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
@@ -388,7 +399,10 @@ public class ProduceAccumulator {
this.count++;
this.messages.add(msg);
this.sendCallbacks.add(sendCallback);
- messagesSize.getAndAdd(msg.getBody().length);
+ int bodySize = null == msg.getBody() ? 0 :
msg.getBody().length;
+ if (bodySize > 0) {
+ messagesSize.addAndGet(bodySize);
+ }
}
if (readyToSend()) {
this.send(sendCallback);
@@ -472,7 +486,8 @@ public class ProduceAccumulator {
if (defaultMQProducer != null) {
final int size = messagesSize.get();
defaultMQProducer.sendDirect(messageBatch,
aggregateKey.mq, new SendCallback() {
- @Override public void onSuccess(SendResult sendResult)
{
+ @Override
+ public void onSuccess(SendResult sendResult) {
try {
splitSendResults(sendResult);
int i = 0;
@@ -490,7 +505,8 @@ public class ProduceAccumulator {
}
}
- @Override public void onException(Throwable e) {
+ @Override
+ public void onException(Throwable e) {
for (SendCallback v : sendCallbacks) {
v.onException(e);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
index 3cb6493384..0f828f2b4e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
@@ -48,8 +48,8 @@ public class SendMessageOpenTracingHookImpl implements
SendMessageHook {
}
Message msg = context.getMessage();
Tracer.SpanBuilder spanBuilder = tracer
- .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
- .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
+ .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
+ .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new
TextMapAdapter(msg.getProperties()));
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
@@ -62,7 +62,7 @@ public class SendMessageOpenTracingHookImpl implements
SendMessageHook {
span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
span.setTag(TraceConstants.ROCKETMQ_STORE_HOST,
context.getBrokerAddr());
span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE,
context.getMsgType().name());
- span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length);
+ span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, null == msg.getBody()
? 0 : msg.getBody().length);
context.setMqTraceContext(span);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index dba04b593f..61738928bb 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -58,7 +58,8 @@ public class SendMessageTraceHookImpl implements
SendMessageHook {
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
- traceBean.setBodyLength(context.getMessage().getBody().length);
+ int bodyLength = null == context.getMessage().getBody() ? 0 :
context.getMessage().getBody().length;
+ traceBean.setBodyLength(bodyLength);
traceBean.setMsgType(context.getMsgType());
traceContext.getTraceBeans().add(traceBean);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index f5491e192a..713f9405ea 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -516,13 +516,15 @@ public class MessageDecoder {
}
}
- // uncompress body
+ // inflate body
if (deCompressBody && (sysFlag &
MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
Compressor compressor =
CompressorFactory.getCompressor(MessageSysFlag.getCompressionType(sysFlag));
body = compressor.decompress(body);
+ sysFlag &= ~MessageSysFlag.COMPRESSED_FLAG;
}
msgExt.setBody(body);
+ msgExt.setSysFlag(sysFlag);
} else {
byteBuffer.position(byteBuffer.position() + bodyLen);
}