This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 34eb2ce7d17caefea63132a4e30bc425594abddd Author: zhouxiang <[email protected]> AuthorDate: Tue Apr 19 11:16:31 2022 +0800 [ISSUE #3906] Mark stream-related request by RequestType --- .../org/apache/rocketmq/client/ClientConfig.java | 25 ++++++++++++- .../client/consumer/DefaultLitePullConsumer.java | 1 + .../client/consumer/DefaultMQPullConsumer.java | 1 + .../rocketmq/client/impl/MQClientAPIImpl.java | 7 +++- .../java/org/apache/rocketmq/common/MixAll.java | 1 + .../rocketmq/common/rpchook/StreamTypeRPCHook.java | 34 ++++++++++++++++++ .../rocketmq/remoting/protocol/RequestType.java | 41 ++++++++++++++++++++++ 7 files changed, 108 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 4452bbdfa..eeb882673 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.common.utils.NameServerAddressUtils; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RequestType; /** * Client Common configuration @@ -65,6 +66,12 @@ public class ClientConfig { private LanguageCode language = LanguageCode.JAVA; + /** + * Enable stream request type will inject a RPCHook to add corresponding request type to remoting layer. + * And it will also generate a different client id to prevent unexpected reuses of MQClientInstance. + */ + protected boolean enableStreamRequestType = false; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -76,6 +83,11 @@ public class ClientConfig { sb.append(this.unitName); } + if (enableStreamRequestType) { + sb.append("@"); + sb.append(RequestType.STREAM); + } + return sb.toString(); } @@ -160,6 +172,7 @@ public class ClientConfig { this.namespace = cc.namespace; this.language = cc.language; this.mqClientApiTimeout = cc.mqClientApiTimeout; + this.enableStreamRequestType = cc.enableStreamRequestType; } public ClientConfig cloneClientConfig() { @@ -179,6 +192,7 @@ public class ClientConfig { cc.namespace = namespace; cc.language = language; cc.mqClientApiTimeout = mqClientApiTimeout; + cc.enableStreamRequestType = enableStreamRequestType; return cc; } @@ -318,12 +332,21 @@ public class ClientConfig { this.mqClientApiTimeout = mqClientApiTimeout; } + public boolean isEnableStreamRequestType() { + return enableStreamRequestType; + } + + public void setEnableStreamRequestType(boolean enableStreamRequestType) { + this.enableStreamRequestType = enableStreamRequestType; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]"; + + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + + ", enableStreamRequestType=" + enableStreamRequestType + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 8c7f0f0b3..7799166f2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -220,6 +220,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; + this.enableStreamRequestType = true; defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 4784e72e1..5e2138e81 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -117,6 +117,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; + this.enableStreamRequestType = true; defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 245195f07..5d7e1685e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.impl; +import com.alibaba.fastjson.JSON; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -152,6 +153,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerR import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.InvokeCallback; @@ -169,7 +171,6 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; -import com.alibaba.fastjson.JSON; public class MQClientAPIImpl { @@ -195,6 +196,10 @@ public class MQClientAPIImpl { this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); this.clientRemotingProcessor = clientRemotingProcessor; + // Inject stream rpc hook first to make reserve field signature + if (clientConfig.isEnableStreamRequestType()) { + this.remotingClient.registerRPCHook(new StreamTypeRPCHook()); + } this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index cf28f2e22..b13a09f92 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -89,6 +89,7 @@ public class MixAll { public static final String REPLY_MESSAGE_FLAG = "reply"; public static final String LMQ_PREFIX = "%LMQ%"; public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ","; + public static final String REQ_T = "ReqT"; private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public static String getWSAddr() { diff --git a/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java new file mode 100644 index 000000000..7a74bc52f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.rpchook; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestType; + +public class StreamTypeRPCHook implements RPCHook { + @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + request.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode())); + } + + @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, + RemotingCommand response) { + + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java new file mode 100644 index 000000000..65217d5b8 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol; + +public enum RequestType { + STREAM((byte) 0); + + private final byte code; + + RequestType(byte code) { + this.code = code; + } + + public static RequestType valueOf(byte code) { + for (RequestType requestType : RequestType.values()) { + if (requestType.getCode() == code) { + return requestType; + } + } + return null; + } + + public byte getCode() { + return code; + } +}
