This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1c35adb3dc [ISSUE #9075]Avoid message type validate in message sync
scenario. (#9076)
1c35adb3dc is described below
commit 1c35adb3dc3824ef39f39d1375cf224428fdc4fb
Author: dingshuangxi888 <[email protected]>
AuthorDate: Wed Dec 25 17:57:47 2024 +0800
[ISSUE #9075]Avoid message type validate in message sync scenario. (#9076)
* Avoid message type validate in message sync scenario.
---
.../java/org/apache/rocketmq/common/message/Message.java | 7 +++++++
.../apache/rocketmq/proxy/processor/ProducerProcessor.java | 6 +++++-
.../proxy/remoting/activity/SendMessageActivity.java | 14 ++++++++++----
3 files changed, 22 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index c7997c4731..acd4df96d2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -108,6 +108,13 @@ public class Message implements Serializable {
return this.properties.get(name);
}
+ public boolean hasProperty(final String name) {
+ if (null == this.properties) {
+ return false;
+ }
+ return this.properties.containsKey(name);
+ }
+
public String getTopic() {
return topic;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 43e16ddd2d..17a2f27fa7 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -74,7 +74,7 @@ public class ProducerProcessor extends AbstractProcessor {
try {
Message message = messageList.get(0);
String topic = message.getTopic();
- if
(ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+ if (isNeedCheckTopicMessageType(message)) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) &&
!NamespaceUtil.isDLQTopic(topic)) {
@@ -261,4 +261,8 @@ public class ProducerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
+ private boolean isNeedCheckTopicMessageType(Message message) {
+ return
ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
+ && !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 17af0fdcb3..22d9efd934 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -21,17 +21,18 @@ import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import java.util.Map;
import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
-import org.apache.rocketmq.remoting.protocol.RequestCode;
-import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import
org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
public class SendMessageActivity extends AbstractRemotingActivity {
TopicMessageTypeValidator topicMessageTypeValidator;
@@ -66,7 +67,7 @@ public class SendMessageActivity extends
AbstractRemotingActivity {
String topic = requestHeader.getTopic();
Map<String, String> property =
MessageDecoder.string2messageProperties(requestHeader.getProperties());
TopicMessageType messageType =
TopicMessageType.parseFromMessageProperty(property);
- if
(ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+ if (isNeedCheckTopicMessageType(property)) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) &&
!NamespaceUtil.isDLQTopic(topic)) {
@@ -87,4 +88,9 @@ public class SendMessageActivity extends
AbstractRemotingActivity {
ProxyContext context) throws Exception {
return request(ctx, request, context,
Duration.ofSeconds(3).toMillis());
}
+
+ private boolean isNeedCheckTopicMessageType(Map<String, String> property) {
+ return
ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
+ && !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG);
+ }
}