This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch release-5.0.0 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit e11dc27c495740abdc2c561acb3f5b700259a9af Author: lk <[email protected]> AuthorDate: Fri Sep 16 17:31:27 2022 +0800 [ISSUE #5069] polish the startup of proxy; can specify parameters on the command line of proxy (#5083) undefined --- .licenserc.yaml | 1 + distribution/bin/mqbroker | 35 +++- distribution/bin/mqshutdown | 7 +- distribution/conf/rmq-proxy.json | 2 +- proxy/BUILD.bazel | 2 + .../apache/rocketmq/proxy/CommandLineArgument.java | 56 ++++++ .../org/apache/rocketmq/proxy/ProxyStartup.java | 85 +++++++- .../rocketmq/proxy/config/Configuration.java | 28 +-- .../apache/rocketmq/proxy/config/ProxyConfig.java | 33 ++-- .../proxy/service/mqclient/MQClientAPIFactory.java | 8 +- .../apache/rocketmq/proxy/ProxyStartupTest.java | 220 +++++++++++++++++++++ .../service/message/LocalMessageServiceTest.java | 2 +- .../org.mockito.plugins.MockMaker | 1 + .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 2 +- 14 files changed, 438 insertions(+), 44 deletions(-) diff --git a/.licenserc.yaml b/.licenserc.yaml index 51741f903..0d732fa04 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -35,6 +35,7 @@ header: - '*/src/test/resources/META-INF/service/*' - '*/src/main/resources/META-INF/service/*' - '*/src/test/resources/rmq-proxy-home/conf/rmq-proxy.json' + - '*/src/test/resources/mockito-extensions/*' - '**/target/**' - '**/*.iml' - 'docs/**' diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker index 6a79c392e..17e39f07c 100644 --- a/distribution/bin/mqbroker +++ b/distribution/bin/mqbroker @@ -42,4 +42,37 @@ fi export ROCKETMQ_HOME -sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@ +other_args=" " +enable_proxy=false + +while [[ $# -gt 0 ]]; do + case $1 in + --enable-proxy) + enable_proxy=true + shift + ;; + -c|--configFile) + broker_config="$2" + shift + shift + ;; + *) + other_args=${other_args}" "${1} + shift + ;; + esac +done + +if [ "$enable_proxy" = true ]; then + args_for_proxy=$other_args" -pm local" + if [ "$broker_config" != "" ]; then + args_for_proxy=${args_for_proxy}" -bc "${broker_config} + fi + sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy} +else + args_for_broker=$other_args + if [ "$broker_config" != "" ]; then + args_for_broker=${args_for_broker}" -c "${broker_config} + fi + sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup ${args_for_broker} +fi \ No newline at end of file diff --git a/distribution/bin/mqshutdown b/distribution/bin/mqshutdown index 6006cfeb1..df2da0c31 100644 --- a/distribution/bin/mqshutdown +++ b/distribution/bin/mqshutdown @@ -17,7 +17,12 @@ case $1 in broker) - + pid=`ps ax | grep -i 'org.apache.rocketmq.proxy.ProxyStartup' | grep '\-pm local' |grep java | grep -v grep | awk '{print $1}'` + if [ "$pid" != "" ] ; then + echo "The mqbroker with proxy enable is running(${pid})..." + kill ${pid} + echo "Send shutdown request to mqbroker with proxy enable OK(${pid})" + fi pid=`ps ax | grep -i 'org.apache.rocketmq.broker.BrokerStartup' |grep java | grep -v grep | awk '{print $1}'` if [ -z "$pid" ] ; then echo "No mqbroker running." diff --git a/distribution/conf/rmq-proxy.json b/distribution/conf/rmq-proxy.json index 077404aaa..8e92bb18e 100644 --- a/distribution/conf/rmq-proxy.json +++ b/distribution/conf/rmq-proxy.json @@ -1,3 +1,3 @@ { - + "rocketMQClusterName": "DefaultCluster" } \ No newline at end of file diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel index eba692157..420267ec0 100644 --- a/proxy/BUILD.bazel +++ b/proxy/BUILD.bazel @@ -26,6 +26,7 @@ java_library( "//common", "//client", "//broker", + "//srvutil", "//acl", "@maven//:org_apache_rocketmq_rocketmq_proto", "@maven//:org_apache_commons_commons_lang3", @@ -50,6 +51,7 @@ java_library( "@maven//:ch_qos_logback_logback_classic", "@maven//:com_google_code_findbugs_jsr305", "@maven//:org_checkerframework_checker_qual", + "@maven//:commons_cli_commons_cli", ], ) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java b/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java new file mode 100644 index 000000000..0499f2659 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy; + +public class CommandLineArgument { + private String namesrvAddr; + private String brokerConfigPath; + private String proxyConfigPath; + private String proxyMode; + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public String getBrokerConfigPath() { + return brokerConfigPath; + } + + public void setBrokerConfigPath(String brokerConfigPath) { + this.brokerConfigPath = brokerConfigPath; + } + + public String getProxyConfigPath() { + return proxyConfigPath; + } + + public void setProxyConfigPath(String proxyConfigPath) { + this.proxyConfigPath = proxyConfigPath; + } + + public String getProxyMode() { + return proxyMode; + } + + public void setProxyMode(String proxyMode) { + this.proxyMode = proxyMode; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 9be0abe20..f605df0bf 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -20,11 +20,17 @@ package org.apache.rocketmq.proxy; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; +import com.google.common.collect.Lists; import io.grpc.protobuf.services.ChannelzService; import io.grpc.protobuf.services.ProtoReflectionService; import java.util.Date; +import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerStartup; @@ -36,6 +42,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.config.Configuration; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.grpc.GrpcServer; @@ -43,6 +50,8 @@ import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder; import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.srvutil.ServerUtil; import org.slf4j.LoggerFactory; public class ProxyStartup { @@ -58,9 +67,9 @@ public class ProxyStartup { public static void main(String[] args) { try { - ConfigurationManager.initEnv(); - initLogger(); - ConfigurationManager.intConfig(); + // parse argument from command line + CommandLineArgument commandLineArgument = parseCommandLineArgument(args); + initLogAndConfiguration(commandLineArgument); // init thread pool monitor for proxy. initThreadPoolMonitor(); @@ -100,7 +109,59 @@ public class ProxyStartup { log.info(new Date() + " rocketmq-proxy startup successfully"); } - private static MessagingProcessor createMessagingProcessor() { + protected static void initLogAndConfiguration(CommandLineArgument commandLineArgument) throws Exception { + if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { + System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); + } + ConfigurationManager.initEnv(); + initLogger(); + ConfigurationManager.intConfig(); + setConfigFromCommandLineArgument(commandLineArgument); + } + + protected static CommandLineArgument parseCommandLineArgument(String[] args) { + CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args, + buildCommandlineOptions(), new DefaultParser()); + if (commandLine == null) { + throw new RuntimeException("parse command line argument failed"); + } + + CommandLineArgument commandLineArgument = new CommandLineArgument(); + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), commandLineArgument); + return commandLineArgument; + } + + private static Options buildCommandlineOptions() { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + + Option opt = new Option("bc", "brokerConfigPath", true, "Broker config file path for local mode"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("pc", "proxyConfigPath", true, "Proxy config file path"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("pm", "proxyMode", true, "Proxy run in local or cluster mode"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) { + if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) { + ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr()); + } + if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) { + ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath()); + } + if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) { + ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode()); + } + } + + protected static MessagingProcessor createMessagingProcessor() { String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); MessagingProcessor messagingProcessor; @@ -112,6 +173,12 @@ public class ProxyStartup { @Override public void start() throws Exception { brokerController.start(); + String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", " + + brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); + if (null != brokerController.getBrokerConfig().getNamesrvAddr()) { + tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr(); + } + log.info(tip); } @Override @@ -134,8 +201,14 @@ public class ProxyStartup { return application; } - private static BrokerController createBrokerController() { - String[] brokerStartupArgs = new String[] {"-c", ConfigurationManager.getProxyConfig().getBrokerConfigPath()}; + protected static BrokerController createBrokerController() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + List<String> brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath()); + if (StringUtils.isNotBlank(config.getNamesrvAddr())) { + brokerStartupArgList.add("-n"); + brokerStartupArgList.add(config.getNamesrvAddr()); + } + String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]); return BrokerStartup.createBrokerController(brokerStartupArgs); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java index 59078c712..9c1ff811b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java @@ -26,6 +26,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,37 +34,38 @@ import org.slf4j.LoggerFactory; public class Configuration { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private final AtomicReference<ProxyConfig> proxyConfigReference = new AtomicReference<>(); + public static final String CONFIG_PATH_PROPERTY = "com.rocketmq.proxy.configPath"; public void init() throws Exception { - String proxyConfigData = loadJsonConfig(ProxyConfig.CONFIG_FILE_NAME); - if (null == proxyConfigData) { - throw new RuntimeException(String.format("load configuration from file: %s error.", ProxyConfig.CONFIG_FILE_NAME)); - } + String proxyConfigData = loadJsonConfig(); ProxyConfig proxyConfig = JSON.parseObject(proxyConfigData, ProxyConfig.class); proxyConfig.initData(); setProxyConfig(proxyConfig); } - public static String loadJsonConfig(String configFileName) throws Exception { - final String testResource = "rmq-proxy-home/conf/" + configFileName; - try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) { - if (null != inputStream) { - return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8)); + public static String loadJsonConfig() throws Exception { + String configFileName = ProxyConfig.DEFAULT_CONFIG_FILE_NAME; + String filePath = System.getProperty(CONFIG_PATH_PROPERTY); + if (StringUtils.isBlank(filePath)) { + final String testResource = "rmq-proxy-home/conf/" + configFileName; + try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) { + if (null != inputStream) { + return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8)); + } } + filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString(); } - String filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString(); - File file = new File(filePath); if (!file.exists()) { log.warn("the config file {} not exist", filePath); - return null; + throw new RuntimeException(String.format("the config file %s not exist", filePath)); } long fileLength = file.length(); if (fileLength <= 0) { log.warn("the config file {} length is zero", filePath); - return null; + throw new RuntimeException(String.format("the config file %s length is zero", filePath)); } return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 5a605f28b..00a6cc35e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.proxy.ProxyMode; import org.slf4j.Logger; @@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory; public class ProxyConfig implements ConfigFile { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - public final static String CONFIG_FILE_NAME = "rmq-proxy.json"; + public final static String DEFAULT_CONFIG_FILE_NAME = "rmq-proxy.json"; private static final int PROCESSOR_NUMBER = Runtime.getRuntime().availableProcessors(); private String rocketMQClusterName = ""; @@ -44,9 +45,9 @@ public class ProxyConfig implements ConfigFile { private long printJstackInMillis = Duration.ofSeconds(60).toMillis(); private long printThreadPoolStatusInMillis = Duration.ofSeconds(3).toMillis(); - private String nameSrvAddr = ""; - private String nameSrvDomain = ""; - private String nameSrvDomainSubgroup = ""; + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + private String namesrvDomain = ""; + private String namesrvDomainSubgroup = ""; /** * gRPC */ @@ -234,28 +235,28 @@ public class ProxyConfig implements ConfigFile { this.printThreadPoolStatusInMillis = printThreadPoolStatusInMillis; } - public String getNameSrvAddr() { - return nameSrvAddr; + public String getNamesrvAddr() { + return namesrvAddr; } - public void setNameSrvAddr(String nameSrvAddr) { - this.nameSrvAddr = nameSrvAddr; + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; } - public String getNameSrvDomain() { - return nameSrvDomain; + public String getNamesrvDomain() { + return namesrvDomain; } - public void setNameSrvDomain(String nameSrvDomain) { - this.nameSrvDomain = nameSrvDomain; + public void setNamesrvDomain(String namesrvDomain) { + this.namesrvDomain = namesrvDomain; } - public String getNameSrvDomainSubgroup() { - return nameSrvDomainSubgroup; + public String getNamesrvDomainSubgroup() { + return namesrvDomainSubgroup; } - public void setNameSrvDomainSubgroup(String nameSrvDomainSubgroup) { - this.nameSrvDomainSubgroup = nameSrvDomainSubgroup; + public void setNamesrvDomainSubgroup(String namesrvDomainSubgroup) { + this.namesrvDomainSubgroup = namesrvDomainSubgroup; } public String getProxyMode() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java index 0b813ae60..9d7db6cf7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java @@ -54,11 +54,11 @@ public class MQClientAPIFactory implements StartAndShutdown { protected void init() { System.setProperty(ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - if (StringUtils.isEmpty(proxyConfig.getNameSrvDomain())) { - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, proxyConfig.getNameSrvAddr()); + if (StringUtils.isEmpty(proxyConfig.getNamesrvDomain())) { + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, proxyConfig.getNamesrvAddr()); } else { - System.setProperty("rocketmq.namesrv.domain", proxyConfig.getNameSrvDomain()); - System.setProperty("rocketmq.namesrv.domain.subgroup", proxyConfig.getNameSrvDomainSubgroup()); + System.setProperty("rocketmq.namesrv.domain", proxyConfig.getNamesrvDomain()); + System.setProperty("rocketmq.namesrv.domain.subgroup", proxyConfig.getNamesrvDomainSubgroup()); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java new file mode 100644 index 000000000..6adf7f3fb --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerStartup; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.proxy.config.Configuration; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; +import org.assertj.core.util.Strings; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; + +import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; + +public class ProxyStartupTest { + + public static String mockProxyHome = "/mock/rmq/proxy/home"; + + @Before + public void before() throws Throwable { + URL mockProxyHomeURL = getClass().getClassLoader().getResource("rmq-proxy-home"); + if (mockProxyHomeURL != null) { + mockProxyHome = mockProxyHomeURL.toURI().getPath(); + } + + if (!Strings.isNullOrEmpty(mockProxyHome)) { + System.setProperty(RMQ_PROXY_HOME, mockProxyHome); + } + } + + @After + public void after() { + System.clearProperty(RMQ_PROXY_HOME); + System.clearProperty(MixAll.NAMESRV_ADDR_PROPERTY); + System.clearProperty(Configuration.CONFIG_PATH_PROPERTY); + System.clearProperty(ClientLogger.CLIENT_LOG_USESLF4J); + } + + @Test + public void testParseAndInitCommandLineArgument() throws Exception { + Path configFilePath = Files.createTempFile("testParseAndInitCommandLineArgument", ".json"); + String configData = "{}"; + Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); + + String brokerConfigPath = "brokerConfigPath"; + String proxyConfigPath = configFilePath.toAbsolutePath().toString(); + String proxyMode = "LOCAL"; + String namesrvAddr = "namesrvAddr"; + CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { + "-bc", brokerConfigPath, + "-pc", proxyConfigPath, + "-pm", proxyMode, + "-n", namesrvAddr + }); + + assertEquals(brokerConfigPath, commandLineArgument.getBrokerConfigPath()); + assertEquals(proxyConfigPath, commandLineArgument.getProxyConfigPath()); + assertEquals(proxyMode, commandLineArgument.getProxyMode()); + assertEquals(namesrvAddr, commandLineArgument.getNamesrvAddr()); + + ProxyStartup.initLogAndConfiguration(commandLineArgument); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + assertEquals(brokerConfigPath, config.getBrokerConfigPath()); + assertEquals(proxyMode, config.getProxyMode()); + assertEquals(namesrvAddr, config.getNamesrvAddr()); + } + + @Test + public void testLocalModeWithNameSrvAddrByProperty() throws Exception { + String namesrvAddr = "namesrvAddr"; + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); + CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { + "-pm", "local" + }); + ProxyStartup.initLogAndConfiguration(commandLineArgument); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + assertEquals(namesrvAddr, config.getNamesrvAddr()); + + validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); + } + + private void validateBrokerCreateArgsWithNamsrvAddr(ProxyConfig config, String namesrvAddr) { + try (MockedStatic<BrokerStartup> brokerStartupMocked = mockStatic(BrokerStartup.class); + MockedStatic<DefaultMessagingProcessor> messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) { + ArgumentCaptor<Object> args = ArgumentCaptor.forClass(Object.class); + brokerStartupMocked.when(() -> BrokerStartup.createBrokerController((String[]) args.capture())) + .thenReturn(mock(BrokerController.class)); + messagingProcessorMocked.when(() -> DefaultMessagingProcessor.createForLocalMode(any(), any())) + .thenReturn(mock(DefaultMessagingProcessor.class)); + + ProxyStartup.createMessagingProcessor(); + String[] passArgs = (String[]) args.getValue(); + assertEquals("-c", passArgs[0]); + assertEquals(config.getBrokerConfigPath(), passArgs[1]); + assertEquals("-n", passArgs[2]); + assertEquals(namesrvAddr, passArgs[3]); + assertEquals(4, passArgs.length); + } + } + + @Test + public void testLocalModeWithNameSrvAddrByConfigFile() throws Exception { + String namesrvAddr = "namesrvAddr"; + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); + Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByConfigFile", ".json"); + String configData = "{\n" + + " \"namesrvAddr\": \"namesrvAddr\"\n" + + "}"; + Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); + + CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { + "-pm", "local", + "-pc", configFilePath.toAbsolutePath().toString() + }); + ProxyStartup.initLogAndConfiguration(commandLineArgument); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + assertEquals(namesrvAddr, config.getNamesrvAddr()); + + validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); + } + + @Test + public void testLocalModeWithNameSrvAddrByCommandLine() throws Exception { + String namesrvAddr = "namesrvAddr"; + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); + Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByCommandLine", ".json"); + String configData = "{\n" + + " \"namesrvAddr\": \"foo\"\n" + + "}"; + Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); + + CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { + "-pm", "local", + "-pc", configFilePath.toAbsolutePath().toString(), + "-n", namesrvAddr + }); + ProxyStartup.initLogAndConfiguration(commandLineArgument); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + assertEquals(namesrvAddr, config.getNamesrvAddr()); + + validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); + } + + @Test + public void testLocalModeWithAllArgs() throws Exception { + String namesrvAddr = "namesrvAddr"; + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); + Path configFilePath = Files.createTempFile("testLocalMode", ".json"); + String configData = "{\n" + + " \"namesrvAddr\": \"foo\"\n" + + "}"; + Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); + Path brokerConfigFilePath = Files.createTempFile("testLocalModeBrokerConfig", ".json"); + + CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { + "-pm", "local", + "-pc", configFilePath.toAbsolutePath().toString(), + "-n", namesrvAddr, + "-bc", brokerConfigFilePath.toAbsolutePath().toString() + }); + ProxyStartup.initLogAndConfiguration(commandLineArgument); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + assertEquals(namesrvAddr, config.getNamesrvAddr()); + assertEquals(brokerConfigFilePath.toAbsolutePath().toString(), config.getBrokerConfigPath()); + + validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); + } + + @Test + public void testClusterMode() throws Exception { + CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { + "-pm", "cluster" + }); + ProxyStartup.initLogAndConfiguration(commandLineArgument); + + try (MockedStatic<DefaultMessagingProcessor> messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) { + DefaultMessagingProcessor processor = mock(DefaultMessagingProcessor.class); + messagingProcessorMocked.when(DefaultMessagingProcessor::createForClusterMode) + .thenReturn(processor); + + assertSame(processor, ProxyStartup.createMessagingProcessor()); + } + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java index e3f6edb99..e46464a6d 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java @@ -114,7 +114,7 @@ public class LocalMessageServiceTest extends InitConfigAndLoggerTest { @Before public void setUp() throws Throwable { super.before(); - ConfigurationManager.getProxyConfig().setNameSrvAddr("1.1.1.1"); + ConfigurationManager.getProxyConfig().setNamesrvAddr("1.1.1.1"); channelManager = new ChannelManager(); Mockito.when(brokerControllerMock.getSendMessageProcessor()).thenReturn(sendMessageProcessorMock); Mockito.when(brokerControllerMock.getPopMessageProcessor()).thenReturn(popMessageProcessorMock); diff --git a/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java index d88b6b1c0..586149cd1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java @@ -152,7 +152,7 @@ public class GrpcBaseIT extends BaseConf { ConfigurationManager.initEnv(); ConfigurationManager.intConfig(); - ConfigurationManager.getProxyConfig().setNameSrvAddr(nsAddr); + ConfigurationManager.getProxyConfig().setNamesrvAddr(nsAddr); // Set LongPollingReserveTimeInMillis to 500ms to reserve more time for IT ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500); ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
