merlimat closed pull request #1089: PIP-12 Introduce builder for creating Producer Consumer Reader URL: https://github.com/apache/incubator-pulsar/pull/1089
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1913dd5ff..49213c90d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -22,10 +22,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.AbstractReplicator.State; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; -import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -43,7 +43,7 @@ protected volatile ProducerImpl producer; protected final int producerQueueSize; - protected final ProducerConfiguration producerConfiguration; + protected final ProducerBuilder producerBuilder; protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS); @@ -68,10 +68,11 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - this.producerConfiguration = new ProducerConfiguration(); - this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS); - this.producerConfiguration.setMaxPendingMessages(producerQueueSize); - this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster)); + this.producerBuilder = client.newProducer() // + .topic(topicName) + .sendTimeout(0, TimeUnit.SECONDS) // + .maxPendingMessages(producerQueueSize) // + .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); } @@ -83,10 +84,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca protected abstract void disableReplicatorRead(); - public ProducerConfiguration getProducerConfiguration() { - return producerConfiguration; - } - public String getRemoteCluster() { return remoteCluster; } @@ -121,7 +118,7 @@ public synchronized void startProducer() { } log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); - client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> { + producerBuilder.createAsync().thenAccept(producer -> { readEntries(producer); }).exceptionally(ex -> { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index d914aa7ec..e9219a614 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -52,7 +52,7 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St BrokerService brokerService) { super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService); - producerConfiguration.setBlockIfQueueFull(false); + producerBuilder.blockIfQueueFull(false); startProducer(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 55019a0d6..a8ec9400c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1149,17 +1149,17 @@ public void testClosingReplicationProducerTwice() throws Exception { brokerService.getReplicationClients().put(remoteCluster, client); PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService); - doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); + doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName), any()); replicator.startProducer(); - verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); + verify(clientImpl).createProducerAsync(matches(globalTopicName), any()); replicator.disconnect(false); replicator.disconnect(false); replicator.startProducer(); - verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); + verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any()); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java new file mode 100644 index 000000000..b832e59c9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; + +/** + * Builder interface that is used to construct a {@link PulsarClient} instance. + * + * @since 2.0.0 + */ +public interface ClientBuilder extends Serializable, Cloneable { + + /** + * @return the new {@link PulsarClient} instance + */ + PulsarClient build() throws PulsarClientException; + + /** + * Create a copy of the current client builder. + * <p> + * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + * <pre> + * ClientBuilder builder = PulsarClient.builder().ioThreads(8).listenerThreads(4); + * + * PulsarClient client1 = builder.clone().serviceUrl(URL_1).build(); + * PulsarClient client2 = builder.clone().serviceUrl(URL_2).build(); + * </pre> + */ + ClientBuilder clone(); + + /** + * Configure the service URL for the Pulsar service. + * <p> + * This parameter is required + * + * @param serviceUrl + * @return + */ + ClientBuilder serviceUrl(String serviceUrl); + + /** + * Set the authentication provider to use in the Pulsar client instance. + * <p> + * Example: + * <p> + * + * <pre> + * <code> + * String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls"; + * + * Map<String, String> conf = new TreeMap<>(); + * conf.put("tlsCertFile", "/my/cert/file"); + * conf.put("tlsKeyFile", "/my/key/file"); + * + * Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf); + * + * PulsarClient client = PulsarClient.builder() + * .serviceUrl(SERVICE_URL) + * .authentication(auth) + * .build(); + * .... + * </code> + * </pre> + * + * @param authentication + * an instance of the {@link Authentication} provider already constructed + */ + ClientBuilder authentication(Authentication authentication); + + /** + * Set the authentication provider to use in the Pulsar client instance. + * <p> + * Example: + * <p> + * + * <pre> + * <code> + * String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls"; + * String AUTH_PARAMS = "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file"; + * + * PulsarClient client = PulsarClient.builder() + * .serviceUrl(SERVICE_URL) + * .authentication(AUTH_CLASS, AUTH_PARAMS) + * .build(); + * .... + * </code> + * </pre> + * + * @param authPluginClassName + * name of the Authentication-Plugin you want to use + * @param authParamsString + * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2" + * @throws UnsupportedAuthenticationException + * failed to instantiate specified Authentication-Plugin + */ + ClientBuilder authentication(String authPluginClassName, String authParamsString) + throws UnsupportedAuthenticationException; + + /** + * Set the authentication provider to use in the Pulsar client instance. + * <p> + * Example: + * <p> + * + * <pre> + * <code> + * String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls"; + * + * Map<String, String> conf = new TreeMap<>(); + * conf.put("tlsCertFile", "/my/cert/file"); + * conf.put("tlsKeyFile", "/my/key/file"); + * + * PulsarClient client = PulsarClient.builder() + * .serviceUrl(SERVICE_URL) + * .authentication(AUTH_CLASS, conf) + * .build(); + * .... + * </code> + * + * @param authPluginClassName + * name of the Authentication-Plugin you want to use + * @param authParams + * map which represents parameters for the Authentication-Plugin + * @throws UnsupportedAuthenticationException + * failed to instantiate specified Authentication-Plugin + */ + ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) + throws UnsupportedAuthenticationException; + + /** + * Set the operation timeout <i>(default: 30 seconds)</i> + * <p> + * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the + * operation will be maked as failed + * + * @param operationTimeout + * operation timeout + * @param unit + * time unit for {@code operationTimeout} + */ + ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit); + + /** + * Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i> + * + * @param numIoThreads + */ + ClientBuilder ioThreads(int numIoThreads); + + /** + * Set the number of threads to be used for message listeners <i>(default: 1 thread)</i> + * + * @param numListenerThreads + */ + ClientBuilder listenerThreads(int numListenerThreads); + + /** + * Sets the max number of connection that the client library will open to a single broker. + * <p> + * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this + * parameter may improve throughput when using many producers over a high latency connection. + * <p> + * + * @param connectionsPerBroker + * max number of connections per broker (needs to be greater than 0) + */ + ClientBuilder connectionsPerBroker(int connectionsPerBroker); + + /** + * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm. + * <p> + * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve + * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall + * throughput, so if latency is not a concern, it's advisable to set the <code>useTcpNoDelay</code> flag to false. + * <p> + * Default value is true + * + * @param enableTcpNoDelay + */ + ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay); + + /** + * Configure whether to use TLS encryption on the connection <i>(default: false)</i> + * + * @param enableTls + */ + ClientBuilder enableTls(boolean enableTls); + + /** + * Set the path to the trusted TLS certificate file + * + * @param tlsTrustCertsFilePath + */ + ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath); + + /** + * Configure whether the Pulsar client accept untrusted TLS certificate from broker <i>(default: false)</i> + * + * @param allowTlsInsecureConnection + */ + ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection); + + /** + * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509 + * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. + * Server Identity hostname verification. + * + * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a> + * + * @param enableTlsHostnameVerification + */ + ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification); + + /** + * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive + * statsIntervalSeconds It should be set to at least 1 second + * + * @param statsIntervalSeconds + * the interval between each stat info + * @param unit + * time unit for {@code statsInterval} + */ + ClientBuilder statsInterval(long statsInterval, TimeUnit unit); + + /** + * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker. + * <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe + * on thousands of topic using created {@link PulsarClient} + * + * @param maxConcurrentLookupRequests + */ + ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests); + + /** + * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection + * will be closed and client creates a new connection that give chance to connect a different broker <i>(default: + * 50)</i> + * + * @param maxNumberOfRejectedRequestPerConnection + */ + ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java index 9ab7b32b1..14f94da5a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java @@ -30,14 +30,13 @@ /** * Class used to specify client side configuration like authentication, etc.. * - * + * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance */ +@Deprecated public class ClientConfiguration implements Serializable { - /** - * - */ private static final long serialVersionUID = 1L; + private Authentication authentication = new AuthenticationDisabled(); private long operationTimeoutMs = 30000; private long statsIntervalSeconds = 60; @@ -221,8 +220,7 @@ public int getConnectionsPerBroker() { * max number of connections per broker (needs to be greater than 0) */ public void setConnectionsPerBroker(int connectionsPerBroker) { - checkArgument(connectionsPerBroker > 0, - "Connections per broker need to be greater than 0"); + checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0"); this.connectionsPerBroker = connectionsPerBroker; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java new file mode 100644 index 000000000..a2c3c8133 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * {@link ConsumerBuilder} is used to configure and create instances of {@link Consumer}. + * + * @see PulsarClient#newConsumer() + * + * @since 2.0.0 + */ +public interface ConsumerBuilder extends Serializable, Cloneable { + + /** + * Create a copy of the current consumer builder. + * <p> + * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + * <pre> + * ConsumerBuilder builder = client.newConsumer() // + * .subscriptionName("my-subscription-name") // + * .subscriptionType(SubscriptionType.Shared) // + * .receiverQueueSize(10); + * + * Consumer consumer1 = builder.clone().topic(TOPIC_1).subscribe(); + * Consumer consumer2 = builder.clone().topic(TOPIC_2).subscribe(); + * </pre> + */ + ConsumerBuilder clone(); + + /** + * Finalize the {@link Consumer} creation by subscribing to the topic. + * + * <p> + * If the subscription does not exist, a new subscription will be created and all messages published after the + * creation will be retained until acknowledged, even if the consumer is not connected. + * + * @return the {@link Consumer} instance + * @throws PulsarClientException + * if the the subscribe operation fails + */ + Consumer subscribe() throws PulsarClientException; + + /** + * Finalize the {@link Consumer} creation by subscribing to the topic in asynchronous mode. + * + * <p> + * If the subscription does not exist, a new subscription will be created and all messages published after the + * creation will be retained until acknowledged, even if the consumer is not connected. + * + * @return a future that will yield a {@link Consumer} instance + * @throws PulsarClientException + * if the the subscribe operation fails + */ + CompletableFuture<Consumer> subscribeAsync(); + + /** + * Specify the topic this consumer will subscribe on. + * <p> + * This argument is required when constructing the consumer. + * + * @param topicName + */ + ConsumerBuilder topic(String topicName); + + /** + * Specify the subscription name for this consumer. + * <p> + * This argument is required when constructing the consumer. + * + * @param subscriptionName + */ + ConsumerBuilder subscriptionName(String subscriptionName); + + /** + * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than + * 10 seconds. + * + * @param ackTimeout + * for unacked messages. + * @param timeUnit + * unit in which the timeout is provided. + * @return {@link ConsumerConfiguration} + */ + ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit); + + /** + * Select the subscription type to be used when subscribing to the topic. + * <p> + * Default is {@link SubscriptionType#Exclusive} + * + * @param subscriptionType + * the subscription type value + */ + ConsumerBuilder subscriptionType(SubscriptionType subscriptionType); + + /** + * Sets a {@link MessageListener} for the consumer + * <p> + * When a {@link MessageListener} is set, application will receive messages through it. Calls to + * {@link Consumer#receive()} will not be allowed. + * + * @param messageListener + * the listener object + */ + ConsumerBuilder messageListener(MessageListener messageListener); + + /** + * Sets a {@link CryptoKeyReader} + * + * @param cryptoKeyReader + * CryptoKeyReader object + */ + ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + + /** + * Sets the ConsumerCryptoFailureAction to the value specified + * + * @param action + * The consumer action + */ + ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction action); + + /** + * Sets the size of the consumer receive queue. + * <p> + * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the + * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer + * throughput at the expense of bigger memory utilization. + * </p> + * <p> + * <b>Setting the consumer queue size as zero</b> + * <ul> + * <li>Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the + * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process + * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue + * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is + * zero.</li> + * <li>Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with + * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives + * exception in callback. <b> consumer will not be able receive any further message unless batch-message in pipeline + * is removed</b></li> + * </ul> + * </p> + * Default value is {@code 1000} messages and should be good for most use cases. + * + * @param receiverQueueSize + * the new receiver queue size value + */ + ConsumerBuilder receiverQueueSize(int receiverQueueSize); + + /** + * Set the max total receiver queue size across partitons. + * <p> + * This setting will be used to reduce the receiver queue size for individual partitions + * {@link #receiverQueueSize(int)} if the total exceeds this value (default: 50000). + * + * @param maxTotalReceiverQueueSizeAcrossPartitions + */ + ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions); + + /** + * Set the consumer name. + * + * @param consumerName + */ + ConsumerBuilder consumerName(String consumerName); + + /** + * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog + * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for + * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that + * point, the messages will be sent as normal. + * + * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. + * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a + * shared subscription, will lead to the subscription call throwing a PulsarClientException. + * + * @param readCompacted + * whether to read from the compacted topic + */ + ConsumerBuilder readCompacted(boolean readCompacted); + + /** + * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching + * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br> + * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have + * permits, else broker will consider next priority level consumers. </br> + * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch + * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B. + * + * <pre> + * Consumer PriorityLevel Permits + * C1 0 2 + * C2 0 1 + * C3 0 1 + * C4 1 2 + * C5 1 1 + * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4 + * </pre> + * + * @param priorityLevel + */ + ConsumerBuilder priorityLevel(int priorityLevel); + + /** + * Set a name/value property with this consumer. + * + * @param key + * @param value + * @return + */ + ConsumerBuilder property(String key, String value); + + /** + * Add all the properties in the provided map + * + * @param properties + * @return + */ + ConsumerBuilder properties(Map<String, String> properties); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 00e4537ad..5f25fa83b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -31,8 +31,9 @@ * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion. * - * + * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance */ +@Deprecated public class ConsumerConfiguration implements Serializable { /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java index 46496b7b7..4acb6e9cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java @@ -18,34 +18,33 @@ */ package org.apache.pulsar.client.api; -import java.util.List; import java.util.Map; public interface CryptoKeyReader { - /* + /** * Return the encryption key corresponding to the key name in the argument * <p> - * This method should be implemented to return the EncryptionKeyInfo. This method will be - * called at the time of producer creation as well as consumer receiving messages. - * Hence, application should not make any blocking calls within the implementation. + * This method should be implemented to return the EncryptionKeyInfo. This method will be called at the time of + * producer creation as well as consumer receiving messages. Hence, application should not make any blocking calls + * within the implementation. * <p> - * - * @param keyName - * Unique name to identify the key - * @param metadata - * Additional information needed to identify the key - * @return EncryptionKeyInfo with details about the public key - * */ + * + * @param keyName + * Unique name to identify the key + * @param metadata + * Additional information needed to identify the key + * @return EncryptionKeyInfo with details about the public key + */ EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata); - /* - * @param keyName - * Unique name to identify the key - * @param metadata - * Additional information needed to identify the key - * @return byte array of the private key value - */ + /** + * @param keyName + * Unique name to identify the key + * @param metadata + * Additional information needed to identify the key + * @return byte array of the private key value + */ EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata); - + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java index 25c89755b..a9cef6f73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java @@ -23,7 +23,7 @@ public interface MessageRouter extends Serializable { /** - * + * * @param msg * Message object * @return The index of the partition to use for the message @@ -42,7 +42,6 @@ default int choosePartition(Message msg) { * @return the partition to route the message. * @since 1.22.0 */ - @SuppressWarnings("deprecation") default int choosePartition(Message msg, TopicMetadata metadata) { return choosePartition(msg); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java new file mode 100644 index 000000000..1d45489b9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +public enum MessageRoutingMode { + SinglePartition, RoundRobinPartition, CustomPartition +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java new file mode 100644 index 000000000..5ab1215dc --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError; + +/** + * {@link ProducerBuilder} is used to configure and create instances of {@link Producer}. + * + * @see PulsarClient#newProducer() + */ +public interface ProducerBuilder extends Serializable, Cloneable { + + /** + * Finalize the creation of the {@link Producer} instance. + * <p> + * This method will block until the producer is created successfully. + * + * @return the producer instance + * @throws PulsarClientException.ProducerBusyException + * if a producer with the same "producer name" is already connected to the topic + * @throws PulsarClientException + * if the producer creation fails + */ + Producer create() throws PulsarClientException; + + /** + * Finalize the creation of the {@link Producer} instance in asynchronous mode. + * <p> + * This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready. + * + * @return a future that will yield the created producer instance + * @throws PulsarClientException.ProducerBusyException + * if a producer with the same "producer name" is already connected to the topic + * @throws PulsarClientException + * if the producer creation fails + */ + CompletableFuture<Producer> createAsync(); + + /** + * Create a copy of the current {@link ProducerBuilder}. + * <p> + * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + * <pre> + * ProducerBuilder builder = client.newProducer().sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true); + * + * Producer producer1 = builder.clone().topic(TOPIC_1).create(); + * Producer producer2 = builder.clone().topic(TOPIC_2).create(); + * </pre> + */ + ProducerBuilder clone(); + + /** + * Specify the topic this producer will be publishing on. + * <p> + * This argument is required when constructing the produce. + * + * @param topicName + */ + ProducerBuilder topic(String topicName); + + /** + * Specify a name for the producer + * <p> + * If not assigned, the system will generate a globally unique name which can be access with + * {@link Producer#getProducerName()}. + * <p> + * When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique + * across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on + * a topic. + * + * @param producerName + * the custom name to use for the producer + */ + ProducerBuilder producerName(String producerName); + + /** + * Set the send timeout <i>(default: 30 seconds)</i> + * <p> + * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. + * + * @param sendTimeout + * the send timeout + * @param unit + * the time unit of the {@code sendTimeout} + */ + ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit); + + /** + * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. + * <p> + * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail + * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. + * + * @param maxPendingMessages + * @return + */ + ProducerBuilder maxPendingMessages(int maxPendingMessages); + + /** + * Set the number of max pending messages across all the partitions + * <p> + * This setting will be used to lower the max pending messages for each partition + * ({@link #maxPendingMessages(int)}), if the total exceeds the configured value. + * + * @param maxPendingMessagesAcrossPartitions + */ + ProducerBuilder maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions); + + /** + * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing + * message queue is full. + * <p> + * Default is <code>false</code>. If set to <code>false</code>, send operations will immediately fail with + * {@link ProducerQueueIsFullError} when there is no space left in pending queue. + * + * @param blockIfQueueFull + * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full + * @return + */ + ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull); + + /** + * Set the message routing mode for the partitioned producer + * + * @param mode + * @return + */ + ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode); + + /** + * Set the compression type for the producer. + * <p> + * By default, message payloads are not compressed. Supported compression types are: + * <ul> + * <li><code>CompressionType.LZ4</code></li> + * <li><code>CompressionType.ZLIB</code></li> + * </ul> + * + * @param compressionType + * @return + */ + ProducerBuilder compressionType(CompressionType compressionType); + + /** + * Set a custom message routing policy by passing an implementation of MessageRouter + * + * + * @param messageRouter + */ + ProducerBuilder messageRouter(MessageRouter messageRouter); + + /** + * Control whether automatic batching of messages is enabled for the producer. <i>default: false [No batching]</i> + * + * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the + * broker, leading to better throughput, especially when publishing small messages. If compression is enabled, + * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or + * contents. + * + * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages + * + * @see #batchingMaxPublishDelay(long, TimeUnit) + */ + ProducerBuilder enableBatching(boolean enableBatching); + + /** + * Sets a {@link CryptoKeyReader} + * + * @param cryptoKeyReader + * CryptoKeyReader object + */ + ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + + /** + * Add public encryption key, used by producer to encrypt the data key. + * + * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are + * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application + * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted + * after compression. If batch messaging is enabled, the batched message is encrypted. + * + */ + ProducerBuilder addEncryptionKey(String key); + + /** + * Sets the ProducerCryptoFailureAction to the value specified + * + * @param The + * producer action + */ + ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action); + + /** + * Set the time period within which the messages sent will be batched <i>default: 10ms</i> if batch messages are + * enabled. If set to a non zero value, messages will be queued until this time interval or until + * + * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single + * batch message. The consumer will be delivered individual messages in the batch in the same order they were + * enqueued + * @param batchDelay + * the batch delay + * @param timeUnit + * the time unit of the {@code batchDelay} + * @return + */ + ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit); + + /** + * Set the maximum number of messages permitted in a batch. <i>default: 1000</i> If set to a value greater than 1, + * messages will be queued until this threshold is reached or batch interval has elapsed + * + * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as + * a single batch message. The consumer will be delivered individual messages in the batch in the same order + * they were enqueued + * @param batchMessagesMaxMessagesPerBatch + * maximum number of messages in a batch + * @return + */ + ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch); + + /** + * Set the baseline for the sequence ids for messages published by the producer. + * <p> + * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned + * incremental sequence ids, if not otherwise specified. + * + * @param initialSequenceId + * @return + */ + ProducerBuilder initialSequenceId(long initialSequenceId); + + /** + * Set a name/value property with this producer. + * + * @param key + * @param value + * @return + */ + ProducerBuilder property(String key, String value); + + /** + * Add all the properties in the provided map + * + * @param properties + * @return + */ + ProducerBuilder properties(Map<String, String> properties); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index a6d2a5583..be72fc7e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; -import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import com.google.common.base.Objects; @@ -36,7 +35,9 @@ /** * Producer's configuration * + * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance */ +@Deprecated public class ProducerConfiguration implements Serializable { private static final long serialVersionUID = 1L; @@ -268,7 +269,7 @@ public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) { * * @return message router. * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already passed as parameter in - * {@link MessageRouter#choosePartition(Message, TopicMetadata)}. + * {@link MessageRouter#choosePartition(Message, TopicMetadata)}. * @see MessageRouter */ @Deprecated @@ -338,7 +339,7 @@ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) * @return encryptionKeys * */ - public ConcurrentOpenHashSet<String> getEncryptionKeys() { + public ConcurrentOpenHashSet<String> getEncryptionKeys() { return this.encryptionKeys; } @@ -354,16 +355,15 @@ public boolean isEncryptionEnabled() { /** * Add public encryption key, used by producer to encrypt the data key. * - * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. - * If keys are found, a callback getKey(String keyName) is invoked against each key to load - * the values of the key. Application should implement this callback to return the key in pkcs8 format. - * If compression is enabled, message is encrypted after compression. - * If batch messaging is enabled, the batched message is encrypted. + * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are + * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application + * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted + * after compression. If batch messaging is enabled, the batched message is encrypted. * */ public void addEncryptionKey(String key) { if (this.encryptionKeys == null) { - this.encryptionKeys = new ConcurrentOpenHashSet<String>(16,1); + this.encryptionKeys = new ConcurrentOpenHashSet<String>(16, 1); } this.encryptionKeys.add(key); } @@ -377,7 +377,8 @@ public void removeEncryptionKey(String key) { /** * Sets the ProducerCryptoFailureAction to the value specified * - * @param The producer action + * @param action + * The producer action */ public void setCryptoFailureAction(ProducerCryptoFailureAction action) { cryptoFailureAction = action; @@ -467,6 +468,7 @@ public ProducerConfiguration setInitialSequenceId(long initialSequenceId) { /** * Set a name/value property with this producer. + * * @param key * @param value * @return @@ -480,6 +482,7 @@ public ProducerConfiguration setProperty(String key, String value) { /** * Add all the properties in the provided map + * * @param properties * @return */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index e15db40a6..6ba2518d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -21,15 +21,28 @@ import java.io.Closeable; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; /** - * Class that provides a client interface to Pulsar - * - * + * Class that provides a client interface to Pulsar. + * <p> + * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and + * {@link Reader} instances. */ public interface PulsarClient extends Closeable { + /** + * Get a new builder instance that can used to configure and build a {@link PulsarClient} instance. + * + * @return the {@link ClientBuilder} + * + * @since 2.0.0 + */ + public static ClientBuilder builder() { + return new ClientBuilderImpl(); + } + /** * Create a new PulsarClient object using default client configuration * @@ -38,7 +51,9 @@ * @return a new pulsar client object * @throws PulsarClientException.InvalidServiceURL * if the serviceUrl is invalid + * @deprecated use {@link #builder()} to construct a client instance */ + @Deprecated public static PulsarClient create(String serviceUrl) throws PulsarClientException { return create(serviceUrl, new ClientConfiguration()); } @@ -53,11 +68,50 @@ public static PulsarClient create(String serviceUrl) throws PulsarClientExceptio * @return a new pulsar client object * @throws PulsarClientException.InvalidServiceURL * if the serviceUrl is invalid + * @deprecated use {@link #builder()} to construct a client instance */ + @Deprecated public static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException { return new PulsarClientImpl(serviceUrl, conf); } + /** + * Create a producer with default for publishing on a specific topic + * <p> + * Example: + * + * <code> + * Producer producer = client.newProducer().topic(myTopic).create(); + * </code> + * + * + * @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance + * + * @since 2.0.0 + */ + ProducerBuilder newProducer(); + + /** + * Create a producer with default for publishing on a specific topic + * + * @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance + * + * @since 2.0.0 + */ + ConsumerBuilder newConsumer(); + + /** + * Create a topic reader for reading messages from the specified topic. + * <p> + * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a + * subscription. Reader can only work on non-partitioned topics. + * + * @return a {@link ReaderBuilder} that can be used to configure and construct a {@link Reader} instance + * + * @since 2.0.0 + */ + ReaderBuilder newReader(); + /** * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic * @@ -72,7 +126,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * if there was an error with the supplied credentials * @throws PulsarClientException.AuthorizationException * if the authorization to publish on topic was denied + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated Producer createProducer(String topic) throws PulsarClientException; /** @@ -81,7 +137,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param topic * The name of the topic where to produce * @return Future of the asynchronously created producer object + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated CompletableFuture<Producer> createProducerAsync(String topic); /** @@ -95,7 +153,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @throws PulsarClientException * if it was not possible to create the producer * @throws InterruptedException + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException; /** @@ -106,7 +166,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ProducerConfiguration} object * @return Future of the asynchronously created producer object + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated CompletableFuture<Producer> createProducerAsync(String topic, ProducerConfiguration conf); /** @@ -119,7 +181,10 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @return The {@code Consumer} object * @throws PulsarClientException * @throws InterruptedException + * + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated Consumer subscribe(String topic, String subscription) throws PulsarClientException; /** @@ -131,7 +196,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param subscription * The subscription name * @return Future of the {@code Consumer} object + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated CompletableFuture<Consumer> subscribeAsync(String topic, String subscription); /** @@ -145,7 +212,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * The {@code ConsumerConfiguration} object * @return The {@code Consumer} object * @throws PulsarClientException + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException; /** @@ -159,7 +228,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ConsumerConfiguration} object * @return Future of the {@code Consumer} object + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf); /** @@ -185,7 +256,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ReaderConfiguration} object * @return The {@code Reader} object + * @deprecated Use {@link #newReader()} to build a new reader */ + @Deprecated Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException; /** @@ -212,7 +285,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ReaderConfiguration} object * @return Future of the asynchronously created producer object + * @deprecated Use {@link #newReader()} to build a new reader */ + @Deprecated CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf); /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java new file mode 100644 index 000000000..196af2057 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; + +/** + * {@link ReaderBuilder} is used to configure and create instances of {@link Reader}. + * + * @see PulsarClient#newReader() + * + * @since 2.0.0 + */ +public interface ReaderBuilder extends Serializable, Cloneable { + + /** + * Finalize the creation of the {@link Reader} instance. + * + * <p> + * This method will block until the reader is created successfully. + * + * @return the reader instance + * @throws PulsarClientException + * if the reader creation fails + */ + Reader create() throws PulsarClientException; + + /** + * Finalize the creation of the {@link Reader} instance in asynchronous mode. + * + * <p> + * This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready. + * + * @return the reader instance + * @throws PulsarClientException + * if the reader creation fails + */ + CompletableFuture<Reader> createAsync(); + + /** + * Create a copy of the current {@link ReaderBuilder}. + * <p> + * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + * <pre> + * ReaderBuilder builder = client.newReader().readerName("my-reader").receiverQueueSize(10); + * + * Reader reader1 = builder.clone().topic(TOPIC_1).create(); + * Reader reader2 = builder.clone().topic(TOPIC_2).create(); + * </pre> + */ + ReaderBuilder clone(); + + /** + * Specify the topic this consumer will subscribe on. + * <p> + * This argument is required when constructing the consumer. + * + * @param topicName + */ + ReaderBuilder topic(String topicName); + + /** + * The initial reader positioning is done by specifying a message id. The options are: + * <ul> + * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic + * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the + * reader was created + * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that + * specific position. The first message to be read will be the message next to the specified messageId. + * </ul> + */ + ReaderBuilder startMessageId(MessageId startMessageId); + + /** + * Sets a {@link ReaderListener} for the reader + * <p> + * When a {@link ReaderListener} is set, application will receive messages through it. Calls to + * {@link Reader#readNext()} will not be allowed. + * + * @param readerListener + * the listener object + */ + ReaderBuilder readerListener(ReaderListener readerListener); + + /** + * Sets a {@link CryptoKeyReader} + * + * @param cryptoKeyReader + * CryptoKeyReader object + */ + ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + + /** + * Sets the ConsumerCryptoFailureAction to the value specified + * + * @param action + * The action to take when the decoding fails + */ + ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action); + + /** + * Sets the size of the consumer receive queue. + * <p> + * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the + * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer + * throughput at the expense of bigger memory utilization. + * </p> + * Default value is {@code 1000} messages and should be good for most use cases. + * + * @param receiverQueueSize + * the new receiver queue size value + */ + ReaderBuilder receiverQueueSize(int receiverQueueSize); + + /** + * Set the reader name. + * + * @param readerName + */ + ReaderBuilder readerName(String readerName); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java index 999e2e60a..6f3816c45 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java @@ -23,6 +23,11 @@ import java.io.Serializable; +/** + * + * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance + */ +@Deprecated public class ReaderConfiguration implements Serializable { private int receiverQueueSize = 1000; @@ -84,8 +89,9 @@ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { /** * Sets the ConsumerCryptoFailureAction to the value specified - * - * @param The consumer action + * + * @param action + * The action to take when the decoding fails */ public void setCryptoFailureAction(ConsumerCryptoFailureAction action) { cryptoFailureAction = action; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java new file mode 100644 index 000000000..2be5318e0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; + +@SuppressWarnings("deprecation") +public class ClientBuilderImpl implements ClientBuilder { + + private static final long serialVersionUID = 1L; + + String serviceUrl; + final ClientConfiguration conf = new ClientConfiguration(); + + @Override + public PulsarClient build() throws PulsarClientException { + if (serviceUrl == null) { + throw new IllegalArgumentException("service URL needs to be specified on the ClientBuilder object"); + } + + return new PulsarClientImpl(serviceUrl, conf); + } + + @Override + public ClientBuilder clone() { + try { + return (ClientBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ClientBuilderImpl"); + } + } + + @Override + public ClientBuilder serviceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + return this; + } + + @Override + public ClientBuilder authentication(Authentication authentication) { + conf.setAuthentication(authentication); + return this; + } + + @Override + public ClientBuilder authentication(String authPluginClassName, String authParamsString) + throws UnsupportedAuthenticationException { + conf.setAuthentication(authPluginClassName, authParamsString); + return this; + } + + @Override + public ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) + throws UnsupportedAuthenticationException { + conf.setAuthentication(authPluginClassName, authParams); + return this; + } + + @Override + public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) { + conf.setOperationTimeout(operationTimeout, unit); + return this; + } + + @Override + public ClientBuilder ioThreads(int numIoThreads) { + conf.setIoThreads(numIoThreads); + return this; + } + + @Override + public ClientBuilder listenerThreads(int numListenerThreads) { + conf.setListenerThreads(numListenerThreads); + return this; + } + + @Override + public ClientBuilder connectionsPerBroker(int connectionsPerBroker) { + conf.setConnectionsPerBroker(connectionsPerBroker); + return this; + } + + @Override + public ClientBuilder enableTcpNoDelay(boolean useTcpNoDelay) { + conf.setUseTcpNoDelay(useTcpNoDelay); + return this; + } + + @Override + public ClientBuilder enableTls(boolean useTls) { + conf.setUseTls(useTls); + return this; + } + + @Override + public ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification) { + conf.setTlsHostnameVerificationEnable(enableTlsHostnameVerification); + return this; + } + + @Override + public ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath) { + conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); + return this; + } + + @Override + public ClientBuilder allowTlsInsecureConnection(boolean tlsAllowInsecureConnection) { + conf.setTlsAllowInsecureConnection(tlsAllowInsecureConnection); + return this; + } + + @Override + public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) { + conf.setStatsInterval(statsInterval, unit); + return this; + } + + @Override + public ClientBuilder maxConcurrentLookupRequests(int concurrentLookupRequests) { + conf.setConcurrentLookupRequest(concurrentLookupRequests); + return this; + } + + @Override + public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) { + conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java new file mode 100644 index 000000000..ab9132615 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.FutureUtil; + + +@SuppressWarnings("deprecation") +public class ConsumerBuilderImpl implements ConsumerBuilder { + + private static final long serialVersionUID = 1L; + + private final PulsarClientImpl client; + private String topicName; + private String subscriptionName; + private final ConsumerConfiguration conf; + + ConsumerBuilderImpl(PulsarClientImpl client) { + this.client = client; + this.conf = new ConsumerConfiguration(); + } + + @Override + public ConsumerBuilder clone() { + try { + return (ConsumerBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ConsumerBuilderImpl"); + } + } + + @Override + public Consumer subscribe() throws PulsarClientException { + try { + return subscribeAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture<Consumer> subscribeAsync() { + if (topicName == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); + } + + if (subscriptionName == null) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Subscription name must be set on the producer builder")); + } + + return client.subscribeAsync(topicName, subscriptionName, conf); + } + + @Override + public ConsumerBuilder topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public ConsumerBuilder subscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + return this; + } + + @Override + public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) { + conf.setAckTimeout(ackTimeout, timeUnit); + return this; + } + + @Override + public ConsumerBuilder subscriptionType(SubscriptionType subscriptionType) { + conf.setSubscriptionType(subscriptionType); + return this; + } + + @Override + public ConsumerBuilder messageListener(MessageListener messageListener) { + conf.setMessageListener(messageListener); + return this; + } + + @Override + public ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + conf.setCryptoKeyReader(cryptoKeyReader); + return this; + } + + @Override + public ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } + + @Override + public ConsumerBuilder receiverQueueSize(int receiverQueueSize) { + conf.setReceiverQueueSize(receiverQueueSize); + return this; + } + + @Override + public ConsumerBuilder consumerName(String consumerName) { + conf.setConsumerName(consumerName); + return this; + } + + @Override + public ConsumerBuilder priorityLevel(int priorityLevel) { + conf.setPriorityLevel(priorityLevel); + return this; + } + + @Override + public ConsumerBuilder property(String key, String value) { + conf.setProperty(key, value); + return this; + } + + @Override + public ConsumerBuilder properties(Map<String, String> properties) { + conf.setProperties(properties); + return this; + } + + @Override + public ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) { + conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions); + return this; + } + + @Override + public ConsumerBuilder readCompacted(boolean readCompacted) { + conf.setReadCompacted(readCompacted); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java new file mode 100644 index 000000000..6bb9a9b59 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.FutureUtil; + +@SuppressWarnings("deprecation") +public class ProducerBuilderImpl implements ProducerBuilder { + + private static final long serialVersionUID = 1L; + + private final PulsarClientImpl client; + private String topicName; + private final ProducerConfiguration conf; + + ProducerBuilderImpl(PulsarClientImpl client) { + this.client = client; + this.conf = new ProducerConfiguration(); + } + + @Override + public ProducerBuilder clone() { + try { + return (ProducerBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ProducerBuilderImpl"); + } + } + + @Override + public Producer create() throws PulsarClientException { + try { + return createAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture<Producer> createAsync() { + if (topicName == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); + } + + return client.createProducerAsync(topicName, conf); + } + + @Override + public ProducerBuilder topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public ProducerBuilder producerName(String producerName) { + conf.setProducerName(producerName); + return this; + } + + @Override + public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) { + conf.setSendTimeout(sendTimeout, unit); + return this; + } + + @Override + public ProducerBuilder maxPendingMessages(int maxPendingMessages) { + conf.setMaxPendingMessages(maxPendingMessages); + return this; + } + + @Override + public ProducerBuilder maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { + conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions); + return this; + } + + @Override + public ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull) { + conf.setBlockIfQueueFull(blockIfQueueFull); + return this; + } + + @Override + public ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode) { + conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString())); + return this; + } + + @Override + public ProducerBuilder compressionType(CompressionType compressionType) { + conf.setCompressionType(compressionType); + return this; + } + + @Override + public ProducerBuilder messageRouter(MessageRouter messageRouter) { + conf.setMessageRouter(messageRouter); + return this; + } + + @Override + public ProducerBuilder enableBatching(boolean batchMessagesEnabled) { + conf.setBatchingEnabled(batchMessagesEnabled); + return this; + } + + @Override + public ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + conf.setCryptoKeyReader(cryptoKeyReader); + return this; + } + + @Override + public ProducerBuilder addEncryptionKey(String key) { + conf.addEncryptionKey(key); + return this; + } + + @Override + public ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } + + @Override + public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) { + conf.setBatchingMaxPublishDelay(batchDelay, timeUnit); + return this; + } + + @Override + public ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch) { + conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch); + return this; + } + + @Override + public ProducerBuilder initialSequenceId(long initialSequenceId) { + conf.setInitialSequenceId(initialSequenceId); + return this; + } + + @Override + public ProducerBuilder property(String key, String value) { + conf.setProperty(key, value); + return this; + } + + @Override + public ProducerBuilder properties(Map<String, String> properties) { + conf.setProperties(properties); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 029f712f3..6e1c5afde 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -32,13 +32,16 @@ import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderConfiguration; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.util.ExecutorProvider; @@ -58,6 +61,7 @@ import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +@SuppressWarnings("deprecation") public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); @@ -92,8 +96,7 @@ public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGr } public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup, - ConnectionPool cnxPool) - throws PulsarClientException { + ConnectionPool cnxPool) throws PulsarClientException { if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) { throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } @@ -117,6 +120,21 @@ public ClientConfiguration getConfiguration() { return conf; } + @Override + public ProducerBuilder newProducer() { + return new ProducerBuilderImpl(this); + } + + @Override + public ConsumerBuilder newConsumer() { + return new ConsumerBuilderImpl(this); + } + + @Override + public ReaderBuilder newReader() { + return new ReaderBuilderImpl(this); + } + @Override public Producer createProducer(String destination) throws PulsarClientException { try { @@ -157,6 +175,7 @@ public Producer createProducer(final String destination, final ProducerConfigura return createProducerAsync(topic, new ProducerConfiguration()); } + @Override public CompletableFuture<Producer> createProducerAsync(final String topic, final ProducerConfiguration conf) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java new file mode 100644 index 000000000..f3751340f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.ReaderConfiguration; +import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.common.util.FutureUtil; + +@SuppressWarnings("deprecation") +public class ReaderBuilderImpl implements ReaderBuilder { + + private static final long serialVersionUID = 1L; + + private final PulsarClientImpl client; + + private final ReaderConfiguration conf; + private String topicName; + private MessageId startMessageId; + + ReaderBuilderImpl(PulsarClientImpl client) { + this.client = client; + this.conf = new ReaderConfiguration(); + } + + @Override + public ReaderBuilder clone() { + try { + return (ReaderBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ReaderBuilderImpl"); + } + } + + @Override + public Reader create() throws PulsarClientException { + try { + return createAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture<Reader> createAsync() { + if (topicName == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); + } + + if (startMessageId == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder")); + } + + return client.createReaderAsync(topicName, startMessageId, conf); + } + + @Override + public ReaderBuilder topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public ReaderBuilder startMessageId(MessageId startMessageId) { + this.startMessageId = startMessageId; + return this; + } + + @Override + public ReaderBuilder readerListener(ReaderListener readerListener) { + conf.setReaderListener(readerListener); + return this; + } + + @Override + public ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + conf.setCryptoKeyReader(cryptoKeyReader); + return this; + } + + @Override + public ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } + + @Override + public ReaderBuilder receiverQueueSize(int receiverQueueSize) { + conf.setReceiverQueueSize(receiverQueueSize); + return this; + } + + @Override + public ReaderBuilder readerName(String readerName) { + conf.setReaderName(readerName); + return this; + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java index 5068a2c06..f96d2885c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java @@ -18,31 +18,25 @@ */ package org.apache.pulsar.client.api; -import org.testng.annotations.Test; - -import com.google.common.base.Objects; - import static org.testng.Assert.assertEquals; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.ConsumerId; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.testng.Assert; +import org.testng.annotations.Test; public class MessageIdTest { - + @Test public void messageIdTest() { MessageId mId = new MessageIdImpl(1, 2, 3); assertEquals(mId.toString(), "1:2:3"); - + mId = new BatchMessageIdImpl(0, 2, 3, 4); assertEquals(mId.toString(), "0:2:3:4"); - + mId = new BatchMessageIdImpl(-1, 2, -3, 4); assertEquals(mId.toString(), "-1:2:-3:4"); - + mId = new MessageIdImpl(0, -23, 3); assertEquals(mId.toString(), "0:-23:3"); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java new file mode 100644 index 000000000..f4b89d5d4 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import org.apache.pulsar.client.api.PulsarClient; +import org.testng.annotations.Test; + +@SuppressWarnings("deprecation") +public class BuildersTest { + + @Test + public void clientBuilderTest() { + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().enableTls(true).ioThreads(10) + .maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650"); + + assertEquals(clientBuilder.conf.isUseTls(), true); + assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650"); + + ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone(); + assertTrue(b2 != clientBuilder); + + b2.serviceUrl("pulsar://other-broker:6650"); + + assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650"); + assertEquals(b2.serviceUrl, "pulsar://other-broker:6650"); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java index 43b7b2699..97f25b149 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java @@ -26,14 +26,14 @@ public class SampleProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException { - PulsarClient pulsarClient = PulsarClient.create("http://127.0.0.1:8080"); + PulsarClient client = PulsarClient.builder().serviceUrl("http://localhost:6650").build(); - Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic"); + Producer producer = client.newProducer().topic("persistent://my-property/use/my-ns/my-topic").create(); for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); } - pulsarClient.close(); + client.close(); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services