This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a4a53706329f8b26650978bcd6319dbffd7bb4ed Author: zhouxiang <[email protected]> AuthorDate: Wed Apr 20 20:01:21 2022 +0800 [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent --- .../rocketmq/acl/common/AclClientRPCHook.java | 54 ++-------- .../rocketmq/acl/common/AclClientRPCHookTest.java | 118 +++++++++++++++++++++ 2 files changed, 128 insertions(+), 44 deletions(-) diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java index 9e5bf1fb5..d4452a3f2 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java @@ -16,11 +16,9 @@ */ package org.apache.rocketmq.acl.common; -import java.lang.reflect.Field; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -30,8 +28,6 @@ import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE; public class AclClientRPCHook implements RPCHook { private final SessionCredentials sessionCredentials; - protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache = - new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>(); public AclClientRPCHook(SessionCredentials sessionCredentials) { this.sessionCredentials = sessionCredentials; @@ -39,16 +35,15 @@ public class AclClientRPCHook implements RPCHook { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - byte[] total = AclUtils.combineRequestContent(request, - parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); - String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); - request.addExtField(SIGNATURE, signature); + // Add AccessKey and SecurityToken into signature calculating. request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); - - // The SecurityToken value is unneccessary,user can choose this one. + // The SecurityToken value is unnecessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } + byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request)); + String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); + request.addExtField(SIGNATURE, signature); } @Override @@ -56,40 +51,11 @@ public class AclClientRPCHook implements RPCHook { } - protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) { - CommandCustomHeader header = request.readCustomHeader(); + protected SortedMap<String, String> parseRequestContent(RemotingCommand request) { + request.makeCustomHeaderToNet(); + Map<String, String> extFields = request.getExtFields(); // Sort property - SortedMap<String, String> map = new TreeMap<String, String>(); - map.put(ACCESS_KEY, ak); - if (securityToken != null) { - map.put(SECURITY_TOKEN, securityToken); - } - try { - // Add header properties - if (null != header) { - Field[] fields = fieldCache.get(header.getClass()); - if (null == fields) { - fields = header.getClass().getDeclaredFields(); - for (Field field : fields) { - field.setAccessible(true); - } - Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields); - if (null != tmp) { - fields = tmp; - } - } - - for (Field field : fields) { - Object value = field.get(header); - if (null != value && !field.isSynthetic()) { - map.put(field.getName(), value.toString()); - } - } - } - return map; - } catch (Exception e) { - throw new RuntimeException("incompatible exception.", e); - } + return new TreeMap<String, String>(extFields); } public SessionCredentials getSessionCredentials() { diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java new file mode 100644 index 000000000..8c0d57d62 --- /dev/null +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.acl.common; + +import java.lang.reflect.Field; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestType; +import org.junit.Test; + +import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY; +import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN; +import static org.assertj.core.api.Assertions.assertThat; + +public class AclClientRPCHookTest { + protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache = + new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>(); + private AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(null); + + @Test + public void testParseRequestContent() { + PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); + requestHeader.setConsumerGroup("group"); + requestHeader.setTopic("topic"); + requestHeader.setQueueId(1); + requestHeader.setQueueOffset(2L); + requestHeader.setMaxMsgNums(32); + requestHeader.setSysFlag(0); + requestHeader.setCommitOffset(0L); + requestHeader.setSuspendTimeoutMillis(15000L); + requestHeader.setSubVersion(0L); + requestHeader.setBrokerName("brokerName"); + RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); + SortedMap<String, String> oldContent = oldVersionParseRequestContent(testPullRemotingCommand, "ak", null); + byte[] oldBytes = AclUtils.combineRequestContent(testPullRemotingCommand, oldContent); + testPullRemotingCommand.addExtField(ACCESS_KEY, "ak"); + SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand); + byte[] newBytes = AclUtils.combineRequestContent(testPullRemotingCommand, content); + assertThat(newBytes).isEqualTo(oldBytes); + } + + @Test + public void testParseRequestContentWithStreamRequestType() { + PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); + requestHeader.setConsumerGroup("group"); + requestHeader.setTopic("topic"); + requestHeader.setQueueId(1); + requestHeader.setQueueOffset(2L); + requestHeader.setMaxMsgNums(32); + requestHeader.setSysFlag(0); + requestHeader.setCommitOffset(0L); + requestHeader.setSuspendTimeoutMillis(15000L); + requestHeader.setSubVersion(0L); + requestHeader.setBrokerName("brokerName"); + RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); + testPullRemotingCommand.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode())); + testPullRemotingCommand.addExtField(ACCESS_KEY, "ak"); + SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand); + assertThat(content.get(MixAll.REQ_T)).isEqualTo(String.valueOf(RequestType.STREAM.getCode())); + } + + private SortedMap<String, String> oldVersionParseRequestContent(RemotingCommand request, String ak, String securityToken) { + CommandCustomHeader header = request.readCustomHeader(); + // Sort property + SortedMap<String, String> map = new TreeMap<String, String>(); + map.put(ACCESS_KEY, ak); + if (securityToken != null) { + map.put(SECURITY_TOKEN, securityToken); + } + try { + // Add header properties + if (null != header) { + Field[] fields = fieldCache.get(header.getClass()); + if (null == fields) { + fields = header.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + } + Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields); + if (null != tmp) { + fields = tmp; + } + } + + for (Field field : fields) { + Object value = field.get(header); + if (null != value && !field.isSynthetic()) { + map.put(field.getName(), value.toString()); + } + } + } + return map; + } catch (Exception e) { + throw new RuntimeException("incompatible exception.", e); + } + } +} \ No newline at end of file
