This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3fe493ec01c64fe714a267565f4e13016ea9c3f6
Merge: 56d2087 446b76b
Author: RongtongJin <[email protected]>
AuthorDate: Tue Mar 8 19:37:53 2022 +0800

    Merge branch 'develop' into 5.0.0-beta
    
    # Conflicts:
    #   acl/pom.xml
    #   broker/pom.xml
    #   client/pom.xml
    #   common/pom.xml
    #   common/src/main/java/org/apache/rocketmq/common/MQVersion.java
    #   distribution/pom.xml
    #   example/pom.xml
    #   filter/pom.xml
    #   logging/pom.xml
    #   namesrv/pom.xml
    #   openmessaging/pom.xml
    #   pom.xml
    #   remoting/pom.xml
    #   srvutil/pom.xml
    #   store/pom.xml
    #   
store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
    #   test/pom.xml
    #   tools/pom.xml

 .github/ISSUE_TEMPLATE/issue_template.md           |  23 +-
 README.md                                          |   2 +
 .../rocketmq/acl/plain/PlainAccessValidator.java   |   9 -
 .../rocketmq/acl/plain/PlainPermissionManager.java | 140 ++++----
 .../acl/plain/PlainAccessControlFlowTest.java      | 396 +++++++++++++++++++++
 .../acl/plain/PlainAccessValidatorTest.java        |  60 +++-
 .../conf/acl/plain_acl.yml                         |  11 +-
 .../both_acl_file_folder_conf/conf}/plain_acl.yml  |  25 +-
 .../empty_acl_folder_conf/conf}/plain_acl.yml      |  25 +-
 .../only_acl_folder_conf}/conf/acl/plain_acl.yml   |  11 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   7 +-
 .../ConsumeMessageConcurrentlyService.java         |   2 +-
 .../client/common/ThreadLocalIndexTest.java        |   2 +-
 .../rocketmq/common/topic/TopicValidator.java      |   5 +
 distribution/conf/{acl => }/plain_acl.yml          |   0
 docs/cn/Deployment.md                              |  12 +-
 docs/cn/best_practice.md                           |   2 +-
 .../namespace/PushConsumerWithNamespace.java       |   2 +-
 .../rocketmq/remoting/protocol/LanguageCode.java   |   3 +-
 .../remoting/protocol/LanguageCodeTest.java        |  26 +-
 .../apache/rocketmq/store/StoreStatsService.java   |  74 +++-
 .../rocketmq/store/logfile/DefaultMappedFile.java  |  20 +-
 .../rocketmq/store/StoreStatsServiceTest.java      |  13 +
 .../org/apache/rocketmq/store/StoreTestUtil.java   |  15 +
 .../store/dledger/DLedgerCommitlogTest.java        |   5 +
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   7 +
 .../tools/admin/DefaultMQAdminExtImpl.java         |  15 +
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   4 +
 .../connection/ConsumerConnectionSubCommand.java   |   8 +-
 .../command/consumer/ConsumerStatusSubCommand.java |   8 +-
 .../tools/admin/DefaultMQAdminExtTest.java         |   4 +
 31 files changed, 757 insertions(+), 179 deletions(-)

diff --cc 
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d7f1652,e7e805d..09599d2
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@@ -272,37 -247,8 +272,36 @@@ public class MQClientAPIImpl 
          this.remotingClient.shutdown();
      }
  
 +    public Set<MessageQueueAssignment> queryAssignment(final String addr, 
final String topic,
 +        final String consumerGroup, final String clientId, final String 
strategyName,
 +        final MessageModel messageModel, final long timeoutMillis)
 +        throws RemotingException, MQBrokerException, InterruptedException {
 +        QueryAssignmentRequestBody requestBody = new 
QueryAssignmentRequestBody();
 +        requestBody.setTopic(topic);
 +        requestBody.setConsumerGroup(consumerGroup);
 +        requestBody.setClientId(clientId);
 +        requestBody.setMessageModel(messageModel);
 +        requestBody.setStrategyName(strategyName);
 +
 +        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_ASSIGNMENT, null);
 +        request.setBody(requestBody.encode());
 +
 +        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
 +            request, timeoutMillis);
 +        switch (response.getCode()) {
 +            case ResponseCode.SUCCESS: {
 +                QueryAssignmentResponseBody queryAssignmentResponseBody = 
QueryAssignmentResponseBody.decode(response.getBody(), 
QueryAssignmentResponseBody.class);
 +                return 
queryAssignmentResponseBody.getMessageQueueAssignments();
 +            }
 +            default:
 +                break;
 +        }
 +
 +        throw new MQBrokerException(response.getCode(), response.getRemark());
 +    }
 +
      public void createSubscriptionGroup(final String addr, final 
SubscriptionGroupConfig config,
-         final long timeoutMillis)
-         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+         final long timeoutMillis) throws RemotingException, 
InterruptedException, MQClientException {
          RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
 null);
  
          byte[] body = RemotingSerializable.encode(config);
@@@ -672,11 -617,11 +671,11 @@@
              String retryBrokerName = brokerName;//by default, it will send to 
the same broker
              if (topicPublishInfo != null) { //select one message queue 
accordingly, in order to determine which broker to send
                  MessageQueue mqChosen = 
producer.selectOneMessageQueue(topicPublishInfo, brokerName);
 -                retryBrokerName = mqChosen.getBrokerName();
 +                retryBrokerName = 
instance.getBrokerNameFromMessageQueue(mqChosen);
              }
              String addr = 
instance.findBrokerAddressInPublish(retryBrokerName);
-             log.warn(String.format("async send msg by retry {} times. 
topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                 retryBrokerName), e);
+             log.warn("async send msg by retry {} times. topic={}, 
brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
+                 retryBrokerName, e);
              try {
                  request.setOpaque(RemotingCommand.createNewRequestId());
                  sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance,
diff --cc 
store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index a8ea1c6,b46e7ca..4d4830b
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@@ -36,15 -36,10 +36,16 @@@ import org.apache.rocketmq.common.const
  import org.apache.rocketmq.logging.InternalLogger;
  import org.apache.rocketmq.logging.InternalLoggerFactory;
  import org.apache.rocketmq.common.message.MessageExt;
 -import org.apache.rocketmq.common.message.MessageExtBatch;
 -import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 +import org.apache.rocketmq.store.AppendMessageCallback;
 +import org.apache.rocketmq.store.AppendMessageResult;
 +import org.apache.rocketmq.store.AppendMessageStatus;
 +import org.apache.rocketmq.store.MessageExtBatch;
 +import org.apache.rocketmq.store.MessageExtBrokerInner;
 +import org.apache.rocketmq.store.PutMessageContext;
 +import org.apache.rocketmq.store.SelectMappedBufferResult;
 +import org.apache.rocketmq.store.TransientStorePool;
  import org.apache.rocketmq.store.config.FlushDiskType;
+ import org.apache.rocketmq.store.config.MessageStoreConfig;
  import org.apache.rocketmq.store.util.LibC;
  import sun.nio.ch.DirectBuffer;
  
diff --cc 
store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 1c0e54c,0e3e01d..89801d5
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@@ -40,9 -39,8 +40,11 @@@ import org.apache.rocketmq.store.PutMes
  import org.junit.Assert;
  import org.junit.Test;
  
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.awaitility.Awaitility.await;
 +
+ import static 
org.apache.rocketmq.store.StoreTestUtil.releaseMmapFilesOnWindows;
+ 
  public class DLedgerCommitlogTest extends MessageStoreTestBase {
  
  
@@@ -367,9 -368,10 +371,10 @@@
  
  
          DefaultMessageStore followerStore = 
createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0);
 -        Thread.sleep(2000);
 +        await().atMost(10, SECONDS).until(followerCatchesUp(followerStore, 
topic));
  
          Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
+         Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0));
          Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0);
  
  

Reply via email to