This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-beta in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3fe493ec01c64fe714a267565f4e13016ea9c3f6 Merge: 56d2087 446b76b Author: RongtongJin <[email protected]> AuthorDate: Tue Mar 8 19:37:53 2022 +0800 Merge branch 'develop' into 5.0.0-beta # Conflicts: # acl/pom.xml # broker/pom.xml # client/pom.xml # common/pom.xml # common/src/main/java/org/apache/rocketmq/common/MQVersion.java # distribution/pom.xml # example/pom.xml # filter/pom.xml # logging/pom.xml # namesrv/pom.xml # openmessaging/pom.xml # pom.xml # remoting/pom.xml # srvutil/pom.xml # store/pom.xml # store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java # test/pom.xml # tools/pom.xml .github/ISSUE_TEMPLATE/issue_template.md | 23 +- README.md | 2 + .../rocketmq/acl/plain/PlainAccessValidator.java | 9 - .../rocketmq/acl/plain/PlainPermissionManager.java | 140 ++++---- .../acl/plain/PlainAccessControlFlowTest.java | 396 +++++++++++++++++++++ .../acl/plain/PlainAccessValidatorTest.java | 60 +++- .../conf/acl/plain_acl.yml | 11 +- .../both_acl_file_folder_conf/conf}/plain_acl.yml | 25 +- .../empty_acl_folder_conf/conf}/plain_acl.yml | 25 +- .../only_acl_folder_conf}/conf/acl/plain_acl.yml | 11 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 7 +- .../ConsumeMessageConcurrentlyService.java | 2 +- .../client/common/ThreadLocalIndexTest.java | 2 +- .../rocketmq/common/topic/TopicValidator.java | 5 + distribution/conf/{acl => }/plain_acl.yml | 0 docs/cn/Deployment.md | 12 +- docs/cn/best_practice.md | 2 +- .../namespace/PushConsumerWithNamespace.java | 2 +- .../rocketmq/remoting/protocol/LanguageCode.java | 3 +- .../remoting/protocol/LanguageCodeTest.java | 26 +- .../apache/rocketmq/store/StoreStatsService.java | 74 +++- .../rocketmq/store/logfile/DefaultMappedFile.java | 20 +- .../rocketmq/store/StoreStatsServiceTest.java | 13 + .../org/apache/rocketmq/store/StoreTestUtil.java | 15 + .../store/dledger/DLedgerCommitlogTest.java | 5 + .../rocketmq/tools/admin/DefaultMQAdminExt.java | 7 + .../tools/admin/DefaultMQAdminExtImpl.java | 15 + .../apache/rocketmq/tools/admin/MQAdminExt.java | 4 + .../connection/ConsumerConnectionSubCommand.java | 8 +- .../command/consumer/ConsumerStatusSubCommand.java | 8 +- .../tools/admin/DefaultMQAdminExtTest.java | 4 + 31 files changed, 757 insertions(+), 179 deletions(-) diff --cc client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index d7f1652,e7e805d..09599d2 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@@ -272,37 -247,8 +272,36 @@@ public class MQClientAPIImpl this.remotingClient.shutdown(); } + public Set<MessageQueueAssignment> queryAssignment(final String addr, final String topic, + final String consumerGroup, final String clientId, final String strategyName, + final MessageModel messageModel, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody(); + requestBody.setTopic(topic); + requestBody.setConsumerGroup(consumerGroup); + requestBody.setClientId(clientId); + requestBody.setMessageModel(messageModel); + requestBody.setStrategyName(strategyName); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_ASSIGNMENT, null); + request.setBody(requestBody.encode()); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + QueryAssignmentResponseBody queryAssignmentResponseBody = QueryAssignmentResponseBody.decode(response.getBody(), QueryAssignmentResponseBody.class); + return queryAssignmentResponseBody.getMessageQueueAssignments(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, - final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); byte[] body = RemotingSerializable.encode(config); @@@ -672,11 -617,11 +671,11 @@@ String retryBrokerName = brokerName;//by default, it will send to the same broker if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); - retryBrokerName = mqChosen.getBrokerName(); + retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen); } String addr = instance.findBrokerAddressInPublish(retryBrokerName); - log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - retryBrokerName), e); + log.warn("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, + retryBrokerName, e); try { request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, diff --cc store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index a8ea1c6,b46e7ca..4d4830b --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@@ -36,15 -36,10 +36,16 @@@ import org.apache.rocketmq.common.const import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageExtBatch; -import org.apache.rocketmq.store.CommitLog.PutMessageContext; +import org.apache.rocketmq.store.AppendMessageCallback; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.MessageExtBatch; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageContext; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.TransientStorePool; import org.apache.rocketmq.store.config.FlushDiskType; + import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.util.LibC; import sun.nio.ch.DirectBuffer; diff --cc store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 1c0e54c,0e3e01d..89801d5 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@@ -40,9 -39,8 +40,11 @@@ import org.apache.rocketmq.store.PutMes import org.junit.Assert; import org.junit.Test; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + + import static org.apache.rocketmq.store.StoreTestUtil.releaseMmapFilesOnWindows; + public class DLedgerCommitlogTest extends MessageStoreTestBase { @@@ -367,9 -368,10 +371,10 @@@ DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0); - Thread.sleep(2000); + await().atMost(10, SECONDS).until(followerCatchesUp(followerStore, topic)); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0)); Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0);
