This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 50b6e79 [Broker] Support disabling non-TLS service ports (#11681) 50b6e79 is described below commit 50b6e79d7cc350efb2208b4aa89f684e133e31c0 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Thu Aug 19 11:36:37 2021 +0300 [Broker] Support disabling non-TLS service ports (#11681) * Support disabling non-tls service ports * Add docs for disabling non-TLS ports * Update site2/docs/security-tls-keystore.md Co-authored-by: Anonymitaet <50226895+anonymit...@users.noreply.github.com> --- .../org/apache/pulsar/broker/PulsarService.java | 7 +++-- .../pulsar/broker/loadbalance/NoopLoadManager.java | 10 +++++-- .../loadbalance/impl/SimpleLoadManagerImpl.java | 2 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 8 ++++-- .../pulsar/broker/service/BrokerServiceTest.java | 32 ++++++++++++++++++++++ .../common/naming/ServiceConfigurationTest.java | 9 ++++++ .../functions/worker/PulsarWorkerService.java | 17 ++++++++++-- .../pulsar/websocket/service/ProxyServer.java | 6 +++- site2/docs/security-tls-keystore.md | 13 +++++++++ 9 files changed, 93 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0c0cec7..6a10daf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1548,12 +1548,15 @@ public class PulsarService implements AutoCloseable { AuthorizationService authorizationService) throws Exception { if (functionWorkerService.isPresent()) { - if (workerConfig.isUseTls()) { + if (workerConfig.isUseTls() || brokerServiceUrl == null) { workerConfig.setPulsarServiceUrl(brokerServiceUrlTls); + } else { + workerConfig.setPulsarServiceUrl(brokerServiceUrl); + } + if (workerConfig.isUseTls() || webServiceAddress == null) { workerConfig.setPulsarWebServiceUrl(webServiceAddressTls); workerConfig.setFunctionWebServiceUrl(webServiceAddressTls); } else { - workerConfig.setPulsarServiceUrl(brokerServiceUrl); workerConfig.setPulsarWebServiceUrl(webServiceAddress); workerConfig.setFunctionWebServiceUrl(webServiceAddress); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index db017c1..85071ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -51,8 +51,7 @@ public class NoopLoadManager implements LoadManager { @Override public void start() throws PulsarServerException { - lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" - + pulsar.getConfiguration().getWebServicePort().get(); + lookupServiceAddress = getBrokerAddress(); localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress), new PulsarResourceDescription()); @@ -71,6 +70,13 @@ public class NoopLoadManager implements LoadManager { } } + private String getBrokerAddress() { + return String.format("%s:%s", pulsar.getAdvertisedAddress(), + pulsar.getConfiguration().getWebServicePort().isPresent() + ? pulsar.getConfiguration().getWebServicePort().get() + : pulsar.getConfiguration().getWebServicePortTls().get()); + } + @Override public boolean isCentralized() { return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index a0d2898..5d743fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -1122,7 +1122,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification return String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort().isPresent() ? pulsar.getConfiguration().getWebServicePort().get() - : pulsar.getConfiguration().getWebServicePortTls()); + : pulsar.getConfiguration().getWebServicePortTls().get()); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 37a7f3a..9009eec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -264,13 +264,15 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { } this.pulsar = startBroker(conf); - brokerUrl = new URL(pulsar.getWebServiceAddress()); - brokerUrlTls = new URL(pulsar.getWebServiceAddressTls()); + brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null; + brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null; if (admin != null) { admin.close(); } - PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()); + PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null + ? brokerUrl.toString() + : brokerUrlTls.toString()); customizeNewPulsarAdminBuilder(pulsarAdminBuilder); admin = spy(pulsarAdminBuilder.build()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 9fee5e6..bff415f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -694,6 +694,38 @@ public class BrokerServiceTest extends BrokerTestBase { } } + @Test + public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception { + final String topicName = "persistent://prop/ns-abc/newTopic"; + final String subName = "newSub"; + + conf.setAuthenticationEnabled(false); + conf.setBrokerServicePort(Optional.empty()); + conf.setBrokerServicePortTls(Optional.of(0)); + conf.setWebServicePort(Optional.empty()); + conf.setWebServicePortTls(Optional.of(0)); + conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + conf.setNumExecutorThreadPoolSize(5); + restartBroker(); + + // Access with TLS (Allow insecure TLS connection) + try { + pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true) + .allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS) + .operationTimeout(1000, TimeUnit.MILLISECONDS).build(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) + .subscribe(); + + } catch (Exception e) { + fail("should not fail"); + } finally { + pulsarClient.close(); + } + } + @SuppressWarnings("deprecation") @Test public void testTlsAuthAllowInsecure() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 452ee96..078ad61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -79,6 +79,15 @@ public class ServiceConfigurationTest { assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5.0)); } + @Test + public void testServicePortsEmpty() throws Exception { + String confFile = "brokerServicePort=\nwebServicePort=\n"; + InputStream stream = new ByteArrayInputStream(confFile.getBytes()); + final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class); + assertEquals(config.getBrokerServicePort(), Optional.empty()); + assertEquals(config.getWebServicePort(), Optional.empty()); + } + /** * test {@link ServiceConfiguration} with incorrect values. * diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index 49e58da..f152adc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -141,7 +141,13 @@ public class PulsarWorkerService implements WorkerService { workerConfig.isTlsAllowInsecureConnection(), workerConfig.isTlsEnableHostnameVerification()); } else { - return WorkerUtils.getPulsarAdminClient(pulsarServiceUrl); + return WorkerUtils.getPulsarAdminClient( + pulsarServiceUrl, + null, + null, + null, + workerConfig.isTlsAllowInsecureConnection(), + workerConfig.isTlsEnableHostnameVerification()); } } @@ -158,7 +164,14 @@ public class PulsarWorkerService implements WorkerService { workerConfig.isTlsAllowInsecureConnection(), workerConfig.isTlsEnableHostnameVerification()); } else { - return WorkerUtils.getPulsarClient(pulsarServiceUrl); + return WorkerUtils.getPulsarClient( + pulsarServiceUrl, + null, + null, + null, + null, + workerConfig.isTlsAllowInsecureConnection(), + workerConfig.isTlsEnableHostnameVerification()); } } }; diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java index ae5fd8d..c616db0 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java @@ -22,10 +22,12 @@ import com.google.common.collect.Lists; import java.net.MalformedURLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.TimeZone; +import java.util.stream.Collectors; import javax.servlet.Servlet; import javax.servlet.ServletException; import javax.websocket.DeploymentException; @@ -120,7 +122,9 @@ public class ProxyServer { } public void start() throws PulsarServerException { - log.info("Starting web socket proxy at port {}", conf.getWebServicePort().get()); + log.info("Starting web socket proxy at port {}", Arrays.stream(server.getConnectors()) + .map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString) + .collect(Collectors.joining(","))); RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLog requestLog = new Slf4jRequestLog(); requestLog.setExtended(true); diff --git a/site2/docs/security-tls-keystore.md b/site2/docs/security-tls-keystore.md index 4f432aa..fd5a75a 100644 --- a/site2/docs/security-tls-keystore.md +++ b/site2/docs/security-tls-keystore.md @@ -131,6 +131,19 @@ brokerClientTlsTrustStorePassword=clientpw NOTE: it is important to restrict access to the store files via filesystem permissions. +If you have configured TLS on the broker, to disable non-TLS ports, you can set the values of the following configurations to empty as below. +``` +brokerServicePort= +webServicePort= +``` +In this case, you need to set the following configurations. + +```conf +brokerClientTlsEnabled=true // Set this to true +brokerClientTlsEnabledWithKeyStore=true // Set this to true +brokerClientTlsTrustStore= // Set this to your desired value +brokerClientTlsTrustStorePassword= // Set this to your desired value + Optional settings that may worth consider: 1. tlsClientAuthentication=false: Enable/Disable using TLS for authentication. This config when enabled will authenticate the other end