This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ee5afa5 Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… (#2543) ee5afa5 is described below commit ee5afa5007e260891140956d244a865d73368321 Author: penghui <codelipeng...@gmail.com> AuthorDate: Wed Sep 19 15:44:15 2018 +0800 Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… (#2543) Support build Pulsar client with serviceUrlProvider method. ### Motivation With serviceUrlProvider we can store the pulsar service url in zookeeper or any other config service. And we can watch the service url change event then control the pulsar client, such as change pulsar client serviceUrl, force close client connection or re-connect with new service url. ### Modifications Add ServiceUrlProvider interface. Add forceCloseConnection method in PulsarClient. --- .../pulsar/client/api/ServiceUrlProviderTest.java | 156 +++++++++++++++++++++ .../apache/pulsar/client/api/ClientBuilder.java | 7 + .../org/apache/pulsar/client/api/PulsarClient.java | 22 +++ .../pulsar/client/api/ServiceUrlProvider.java | 42 ++++++ .../pulsar/client/impl/ClientBuilderImpl.java | 20 ++- .../apache/pulsar/client/impl/HandlerState.java | 4 + .../apache/pulsar/client/impl/LookupService.java | 2 +- .../client/impl/PartitionedProducerImpl.java | 4 + .../pulsar/client/impl/PulsarClientImpl.java | 29 +++- .../client/impl/conf/ClientConfigurationData.java | 2 + 10 files changed, 283 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java new file mode 100644 index 0000000..ab86f12 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +public class ServiceUrlProviderTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testCreateClientWithServiceUrlProvider() throws Exception { + + PulsarClient client = PulsarClient.builder() + .serviceUrlProvider(new TestServiceUrlProvider(pulsar.getBrokerServiceUrl())) + .statsInterval(1, TimeUnit.SECONDS) + .build(); + Assert.assertTrue(((PulsarClientImpl) client).getConfiguration().getServiceUrlProvider() instanceof TestServiceUrlProvider); + Producer<String> producer = client.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .subscriptionName("my-subscribe") + .subscribe(); + for (int i = 0; i < 100; i++) { + producer.send("Hello Pulsar[" + i + "]"); + } + client.forceCloseConnection(); + for (int i = 100; i < 200; i++) { + producer.send("Hello Pulsar[" + i + "]"); + } + int received = 0; + do { + Message<String> message = consumer.receive(); + System.out.println(message.getValue()); + received++; + } while (received < 200); + Assert.assertEquals(200, received); + producer.close(); + consumer.close(); + client.close(); + } + + @Test + public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception { + + AutoChangedServiceUrlProvider serviceUrlProvider = new AutoChangedServiceUrlProvider(pulsar.getBrokerServiceUrl()); + + PulsarClient client = PulsarClient.builder() + .serviceUrlProvider(serviceUrlProvider) + .statsInterval(1, TimeUnit.SECONDS) + .build(); + Assert.assertTrue(((PulsarClientImpl) client).getConfiguration().getServiceUrlProvider() instanceof AutoChangedServiceUrlProvider); + + ProducerImpl<String> producer = (ProducerImpl<String>) client.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .subscriptionName("my-subscribe") + .subscribe(); + + PulsarService pulsarService1 = pulsar; + conf.setBrokerServicePort(PortManager.nextFreePort()); + conf.setWebServicePort(PortManager.nextFreePort()); + startBroker(); + PulsarService pulsarService2 = pulsar; + System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() + ", Pulsar2=" + pulsarService2.getBrokerServiceUrl()); + Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl()); + Assert.assertEquals("pulsar://" + producer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl()); + Assert.assertEquals("pulsar://" + consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl()); + serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl()); + Assert.assertEquals("pulsar://" + producer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl()); + Assert.assertEquals("pulsar://" + consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl()); + producer.close(); + consumer.close(); + client.close(); + } + + class TestServiceUrlProvider implements ServiceUrlProvider { + + private PulsarClient pulsarClient; + + private String serviceUrl; + + public TestServiceUrlProvider(String serviceUrl) { + this.serviceUrl = serviceUrl; + } + + @Override + public String getServiceUrl() { + return serviceUrl; + } + + @Override + public void setClient(PulsarClient client) { + this.pulsarClient = client; + } + + public PulsarClient getPulsarClient() { + return pulsarClient; + } + } + + class AutoChangedServiceUrlProvider extends TestServiceUrlProvider { + + public AutoChangedServiceUrlProvider(String serviceUrl) { + super(serviceUrl); + } + + public void onServiceUrlChanged(String newServiceUrl) throws PulsarClientException { + this.getPulsarClient().getConf().setServiceUrl(newServiceUrl); + this.getPulsarClient().reloadLookUp(); + this.getPulsarClient().forceCloseConnection(); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 8aff26f..2831c09 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -81,6 +81,13 @@ public interface ClientBuilder extends Cloneable { ClientBuilder serviceUrl(String serviceUrl); /** + * Configure the service URL provider for Pulsar service + * @param serviceUrlProvider + * @return + */ + ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider); + + /** * Set the authentication provider to use in the Pulsar client instance. * <p> * Example: diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 5827573..cba451b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -368,4 +368,26 @@ public interface PulsarClient extends Closeable { * if the forceful shutdown fails */ void shutdown() throws PulsarClientException; + + /** + * Force close connection of pulsar client. + * + * close all producer connection and close all consumer producer. + * + */ + void forceCloseConnection(); + + /** + * Reload lookup service in pulsar client. + * + * @throws PulsarClientException + */ + void reloadLookUp() throws PulsarClientException; + + /** + * Get client config data. + * + * @return + */ + ClientConfigurationData getConf(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java new file mode 100644 index 0000000..9b6e963 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * The provider to provide the service url + * It used by {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)} + */ +public interface ServiceUrlProvider { + + /** + * Get pulsar service url from ServiceUrlProvider. + * + * @return pulsar service url. + */ + String getServiceUrl(); + + /** + * Set pulsar client to the provider for provider can control the pulsar client, + * such as {@link PulsarClient#forceCloseConnection()} or {@link PulsarClient#close()}. + * + * @param client created pulsar client. + */ + void setClient(PulsarClient client); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index e2d5c4c..6a59520 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -21,12 +21,14 @@ package org.apache.pulsar.client.impl; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; @@ -43,11 +45,17 @@ public class ClientBuilderImpl implements ClientBuilder { @Override public PulsarClient build() throws PulsarClientException { + if (conf.getServiceUrlProvider() != null && StringUtils.isNotBlank(conf.getServiceUrlProvider().getServiceUrl())) { + conf.setServiceUrl(conf.getServiceUrlProvider().getServiceUrl()); + } if (conf.getServiceUrl() == null) { - throw new IllegalArgumentException("service URL needs to be specified on the ClientBuilder object"); + throw new IllegalArgumentException("service URL or service URL provider needs to be specified on the ClientBuilder object"); } - - return new PulsarClientImpl(conf); + PulsarClient client = new PulsarClientImpl(conf); + if (conf.getServiceUrlProvider() != null) { + conf.getServiceUrlProvider().setClient(client); + } + return client; } @Override @@ -72,6 +80,12 @@ public class ClientBuilderImpl implements ClientBuilder { } @Override + public ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider) { + conf.setServiceUrlProvider(serviceUrlProvider); + return this; + } + + @Override public ClientBuilder authentication(Authentication authentication) { conf.setAuthentication(authentication); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java index 6189583..b48e702 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java @@ -66,4 +66,8 @@ abstract class HandlerState { protected State getAndUpdateState(final UnaryOperator<State> updater) { return STATE_UPDATER.getAndUpdate(this, updater); } + + public PulsarClientImpl getClient() { + return client; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 769422b..568c703 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -40,7 +40,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; * </ul> * */ -interface LookupService extends AutoCloseable { +public interface LookupService extends AutoCloseable { /** * Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 12ecf2b..8c4387e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -237,6 +237,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class); + protected List<ProducerImpl<T>> getProducers() { + return producers.stream().collect(Collectors.toList()); + } + @Override String getHandlerName() { return "partition-producer"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index b87b9bd..5dd0e3d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -34,6 +34,7 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -82,7 +83,7 @@ public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); private final ClientConfigurationData conf; - private final LookupService lookup; + private LookupService lookup; private final ConnectionPool cnxPool; private final Timer timer; private final ExecutorProvider externalExecutorProvider; @@ -706,6 +707,19 @@ public class PulsarClientImpl implements PulsarClient { } } + @Override + public void forceCloseConnection() { + for (ConcurrentMap<Integer, CompletableFuture<ClientCnx>> cnxMap : cnxPool.pool.values()) { + for (CompletableFuture<ClientCnx> clientCnxCompletableFuture : cnxMap.values()) { + try { + clientCnxCompletableFuture.get().close(); + } catch (Exception e) { + log.error("Force close connection exception ", e); + } + } + } + } + protected CompletableFuture<ClientCnx> getConnection(final String topic) { TopicName topicName = TopicName.get(topic); return lookup.getBroker(topicName) @@ -745,6 +759,19 @@ public class PulsarClientImpl implements PulsarClient { return lookup; } + public void reloadLookUp() throws PulsarClientException { + if (conf.getServiceUrl().startsWith("http")) { + lookup = new HttpLookupService(conf, eventLoopGroup); + } else { + lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor()); + } + } + + @Override + public ClientConfigurationData getConf() { + return conf; + } + public CompletableFuture<Integer> getNumberOfPartitions(String topic) { return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index b51d61f..1ff24c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.conf; import java.io.Serializable; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -35,6 +36,7 @@ public class ClientConfigurationData implements Serializable, Cloneable { private static final long serialVersionUID = 1L; private String serviceUrl; + private ServiceUrlProvider serviceUrlProvider; @JsonIgnore private Authentication authentication = new AuthenticationDisabled();