merlimat closed pull request #1089: PIP-12 Introduce builder for creating 
Producer Consumer Reader
URL: https://github.com/apache/incubator-pulsar/pull/1089
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1913dd5ff..49213c90d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -22,10 +22,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.service.AbstractReplicator.State;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -43,7 +43,7 @@
     protected volatile ProducerImpl producer;
 
     protected final int producerQueueSize;
-    protected final ProducerConfiguration producerConfiguration;
+    protected final ProducerBuilder producerBuilder;
 
     protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 
1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);
 
@@ -68,10 +68,11 @@ public AbstractReplicator(String topicName, String 
replicatorPrefix, String loca
         this.producer = null;
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
-        this.producerConfiguration = new ProducerConfiguration();
-        this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
-        this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
-        
this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, 
localCluster));
+        this.producerBuilder = client.newProducer() //
+                .topic(topicName)
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                .maxPendingMessages(producerQueueSize) //
+                .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
         STATE_UPDATER.set(this, State.Stopped);
     }
 
@@ -83,10 +84,6 @@ public AbstractReplicator(String topicName, String 
replicatorPrefix, String loca
 
     protected abstract void disableReplicatorRead();
 
-    public ProducerConfiguration getProducerConfiguration() {
-        return producerConfiguration;
-    }
-
     public String getRemoteCluster() {
         return remoteCluster;
     }
@@ -121,7 +118,7 @@ public synchronized void startProducer() {
         }
 
         log.info("[{}][{} -> {}] Starting replicator", topicName, 
localCluster, remoteCluster);
-        client.createProducerAsync(topicName, 
producerConfiguration).thenAccept(producer -> {
+        producerBuilder.createAsync().thenAccept(producer -> {
             readEntries(producer);
         }).exceptionally(ex -> {
             if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index d914aa7ec..e9219a614 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -52,7 +52,7 @@ public NonPersistentReplicator(NonPersistentTopic topic, 
String localCluster, St
             BrokerService brokerService) {
         super(topic.getName(), topic.replicatorPrefix, localCluster, 
remoteCluster, brokerService);
 
-        producerConfiguration.setBlockIfQueueFull(false);
+        producerBuilder.blockIfQueueFull(false);
 
         startProducer();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 55019a0d6..a8ec9400c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1149,17 +1149,17 @@ public void testClosingReplicationProducerTwice() 
throws Exception {
         brokerService.getReplicationClients().put(remoteCluster, client);
         PersistentReplicator replicator = new PersistentReplicator(topic, 
cursor, localCluster, remoteCluster, brokerService);
 
-        doReturn(new 
CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName,
 replicator.getProducerConfiguration());
+        doReturn(new 
CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName),
 any());
 
         replicator.startProducer();
-        verify(clientImpl).createProducerAsync(globalTopicName, 
replicator.getProducerConfiguration());
+        verify(clientImpl).createProducerAsync(matches(globalTopicName), 
any());
 
         replicator.disconnect(false);
         replicator.disconnect(false);
 
         replicator.startProducer();
 
-        verify(clientImpl, 
Mockito.times(2)).createProducerAsync(globalTopicName, 
replicator.getProducerConfiguration());
+        verify(clientImpl, 
Mockito.times(2)).createProducerAsync(matches(globalTopicName), any());
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
new file mode 100644
index 000000000..b832e59c9
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+
+/**
+ * Builder interface that is used to construct a {@link PulsarClient} instance.
+ *
+ * @since 2.0.0
+ */
+public interface ClientBuilder extends Serializable, Cloneable {
+
+    /**
+     * @return the new {@link PulsarClient} instance
+     */
+    PulsarClient build() throws PulsarClientException;
+
+    /**
+     * Create a copy of the current client builder.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ClientBuilder builder = 
PulsarClient.builder().ioThreads(8).listenerThreads(4);
+     *
+     * PulsarClient client1 = builder.clone().serviceUrl(URL_1).build();
+     * PulsarClient client2 = builder.clone().serviceUrl(URL_2).build();
+     * </pre>
+     */
+    ClientBuilder clone();
+
+    /**
+     * Configure the service URL for the Pulsar service.
+     * <p>
+     * This parameter is required
+     *
+     * @param serviceUrl
+     * @return
+     */
+    ClientBuilder serviceUrl(String serviceUrl);
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * String AUTH_CLASS = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     *
+     * Map<String, String> conf = new TreeMap<>();
+     * conf.put("tlsCertFile", "/my/cert/file");
+     * conf.put("tlsKeyFile", "/my/key/file");
+     *
+     * Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf);
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(auth)
+     *          .build();
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authentication
+     *            an instance of the {@link Authentication} provider already 
constructed
+     */
+    ClientBuilder authentication(Authentication authentication);
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * String AUTH_CLASS = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     * String AUTH_PARAMS = 
"tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file";
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(AUTH_CLASS, AUTH_PARAMS)
+     *          .build();
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParamsString
+     *            string which represents parameters for the 
Authentication-Plugin, e.g., "key1:val1,key2:val2"
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    ClientBuilder authentication(String authPluginClassName, String 
authParamsString)
+            throws UnsupportedAuthenticationException;
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * String AUTH_CLASS = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     *
+     * Map<String, String> conf = new TreeMap<>();
+     * conf.put("tlsCertFile", "/my/cert/file");
+     * conf.put("tlsKeyFile", "/my/key/file");
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(AUTH_CLASS, conf)
+     *          .build();
+     * ....
+     * </code>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    ClientBuilder authentication(String authPluginClassName, Map<String, 
String> authParams)
+            throws UnsupportedAuthenticationException;
+
+    /**
+     * Set the operation timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * Producer-create, subscribe and unsubscribe operations will be retried 
until this interval, after which the
+     * operation will be maked as failed
+     *
+     * @param operationTimeout
+     *            operation timeout
+     * @param unit
+     *            time unit for {@code operationTimeout}
+     */
+    ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
+
+    /**
+     * Set the number of threads to be used for handling connections to 
brokers <i>(default: 1 thread)</i>
+     *
+     * @param numIoThreads
+     */
+    ClientBuilder ioThreads(int numIoThreads);
+
+    /**
+     * Set the number of threads to be used for message listeners <i>(default: 
1 thread)</i>
+     *
+     * @param numListenerThreads
+     */
+    ClientBuilder listenerThreads(int numListenerThreads);
+
+    /**
+     * Sets the max number of connection that the client library will open to 
a single broker.
+     * <p>
+     * By default, the connection pool will use a single connection for all 
the producers and consumers. Increasing this
+     * parameter may improve throughput when using many producers over a high 
latency connection.
+     * <p>
+     *
+     * @param connectionsPerBroker
+     *            max number of connections per broker (needs to be greater 
than 0)
+     */
+    ClientBuilder connectionsPerBroker(int connectionsPerBroker);
+
+    /**
+     * Configure whether to use TCP no-delay flag on the connection, to 
disable Nagle algorithm.
+     * <p>
+     * No-delay features make sure packets are sent out on the network as soon 
as possible, and it's critical to achieve
+     * low latency publishes. On the other hand, sending out a huge number of 
small packets might limit the overall
+     * throughput, so if latency is not a concern, it's advisable to set the 
<code>useTcpNoDelay</code> flag to false.
+     * <p>
+     * Default value is true
+     *
+     * @param enableTcpNoDelay
+     */
+    ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);
+
+    /**
+     * Configure whether to use TLS encryption on the connection <i>(default: 
false)</i>
+     *
+     * @param enableTls
+     */
+    ClientBuilder enableTls(boolean enableTls);
+
+    /**
+     * Set the path to the trusted TLS certificate file
+     *
+     * @param tlsTrustCertsFilePath
+     */
+    ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);
+
+    /**
+     * Configure whether the Pulsar client accept untrusted TLS certificate 
from broker <i>(default: false)</i>
+     *
+     * @param allowTlsInsecureConnection
+     */
+    ClientBuilder allowTlsInsecureConnection(boolean 
allowTlsInsecureConnection);
+
+    /**
+     * It allows to validate hostname verification when client connects to 
broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected 
broker's host name. It follows RFC 2818, 3.1.
+     * Server Identity hostname verification.
+     *
+     * @see <a href="https://tools.ietf.org/html/rfc2818";>rfc2818</a>
+     *
+     * @param enableTlsHostnameVerification
+     */
+    ClientBuilder enableTlsHostnameVerification(boolean 
enableTlsHostnameVerification);
+
+    /**
+     * Set the interval between each stat info <i>(default: 60 seconds)</i> 
Stats will be activated with positive
+     * statsIntervalSeconds It should be set to at least 1 second
+     *
+     * @param statsIntervalSeconds
+     *            the interval between each stat info
+     * @param unit
+     *            time unit for {@code statsInterval}
+     */
+    ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
+
+    /**
+     * Number of concurrent lookup-requests allowed on each broker-connection 
to prevent overload on broker.
+     * <i>(default: 5000)</i> It should be configured with higher value only 
in case of it requires to produce/subscribe
+     * on thousands of topic using created {@link PulsarClient}
+     *
+     * @param maxConcurrentLookupRequests
+     */
+    ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
+
+    /**
+     * Set max number of broker-rejected requests in a certain time-frame (30 
seconds) after which current connection
+     * will be closed and client creates a new connection that give chance to 
connect a different broker <i>(default:
+     * 50)</i>
+     *
+     * @param maxNumberOfRejectedRequestPerConnection
+     */
+    ClientBuilder maxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRequestPerConnection);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index 9ab7b32b1..14f94da5a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -30,14 +30,13 @@
 /**
  * Class used to specify client side configuration like authentication, etc..
  *
- *
+ * @deprecated Use {@link PulsarClient#builder()} to construct and configure a 
new {@link PulsarClient} instance
  */
+@Deprecated
 public class ClientConfiguration implements Serializable {
 
-    /**
-     *
-     */
     private static final long serialVersionUID = 1L;
+
     private Authentication authentication = new AuthenticationDisabled();
     private long operationTimeoutMs = 30000;
     private long statsIntervalSeconds = 60;
@@ -221,8 +220,7 @@ public int getConnectionsPerBroker() {
      *            max number of connections per broker (needs to be greater 
than 0)
      */
     public void setConnectionsPerBroker(int connectionsPerBroker) {
-        checkArgument(connectionsPerBroker > 0,
-                "Connections per broker need to be greater than 0");
+        checkArgument(connectionsPerBroker > 0, "Connections per broker need 
to be greater than 0");
         this.connectionsPerBroker = connectionsPerBroker;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
new file mode 100644
index 000000000..a2c3c8133
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link ConsumerBuilder} is used to configure and create instances of {@link 
Consumer}.
+ *
+ * @see PulsarClient#newConsumer()
+ *
+ * @since 2.0.0
+ */
+public interface ConsumerBuilder extends Serializable, Cloneable {
+
+    /**
+     * Create a copy of the current consumer builder.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ConsumerBuilder builder = client.newConsumer() //
+     *         .subscriptionName("my-subscription-name") //
+     *         .subscriptionType(SubscriptionType.Shared) //
+     *         .receiverQueueSize(10);
+     *
+     * Consumer consumer1 = builder.clone().topic(TOPIC_1).subscribe();
+     * Consumer consumer2 = builder.clone().topic(TOPIC_2).subscribe();
+     * </pre>
+     */
+    ConsumerBuilder clone();
+
+    /**
+     * Finalize the {@link Consumer} creation by subscribing to the topic.
+     *
+     * <p>
+     * If the subscription does not exist, a new subscription will be created 
and all messages published after the
+     * creation will be retained until acknowledged, even if the consumer is 
not connected.
+     *
+     * @return the {@link Consumer} instance
+     * @throws PulsarClientException
+     *             if the the subscribe operation fails
+     */
+    Consumer subscribe() throws PulsarClientException;
+
+    /**
+     * Finalize the {@link Consumer} creation by subscribing to the topic in 
asynchronous mode.
+     *
+     * <p>
+     * If the subscription does not exist, a new subscription will be created 
and all messages published after the
+     * creation will be retained until acknowledged, even if the consumer is 
not connected.
+     *
+     * @return a future that will yield a {@link Consumer} instance
+     * @throws PulsarClientException
+     *             if the the subscribe operation fails
+     */
+    CompletableFuture<Consumer> subscribeAsync();
+
+    /**
+     * Specify the topic this consumer will subscribe on.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param topicName
+     */
+    ConsumerBuilder topic(String topicName);
+
+    /**
+     * Specify the subscription name for this consumer.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param subscriptionName
+     */
+    ConsumerBuilder subscriptionName(String subscriptionName);
+
+    /**
+     * Set the timeout for unacked messages, truncated to the nearest 
millisecond. The timeout needs to be greater than
+     * 10 seconds.
+     *
+     * @param ackTimeout
+     *            for unacked messages.
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return {@link ConsumerConfiguration}
+     */
+    ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit);
+
+    /**
+     * Select the subscription type to be used when subscribing to the topic.
+     * <p>
+     * Default is {@link SubscriptionType#Exclusive}
+     *
+     * @param subscriptionType
+     *            the subscription type value
+     */
+    ConsumerBuilder subscriptionType(SubscriptionType subscriptionType);
+
+    /**
+     * Sets a {@link MessageListener} for the consumer
+     * <p>
+     * When a {@link MessageListener} is set, application will receive 
messages through it. Calls to
+     * {@link Consumer#receive()} will not be allowed.
+     *
+     * @param messageListener
+     *            the listener object
+     */
+    ConsumerBuilder messageListener(MessageListener messageListener);
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            The consumer action
+     */
+    ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction action);
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be 
accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value 
could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * <p>
+     * <b>Setting the consumer queue size as zero</b>
+     * <ul>
+     * <li>Decreases the throughput of the consumer, by disabling pre-fetching 
of messages. This approach improves the
+     * message distribution on shared subscription, by pushing messages only 
to the consumers that are ready to process
+     * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned 
Topics can be used if the consumer queue
+     * size is zero. {@link Consumer#receive()} function call should not be 
interrupted when the consumer queue size is
+     * zero.</li>
+     * <li>Doesn't support Batch-Message: if consumer receives any 
batch-message then it closes consumer connection with
+     * broker and {@link Consumer#receive()} call will remain blocked while 
{@link Consumer#receiveAsync()} receives
+     * exception in callback. <b> consumer will not be able receive any 
further message unless batch-message in pipeline
+     * is removed</b></li>
+     * </ul>
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use 
cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    ConsumerBuilder receiverQueueSize(int receiverQueueSize);
+
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for 
individual partitions
+     * {@link #receiverQueueSize(int)} if the total exceeds this value 
(default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int 
maxTotalReceiverQueueSizeAcrossPartitions);
+
+    /**
+     * Set the consumer name.
+     *
+     * @param consumerName
+     */
+    ConsumerBuilder consumerName(String consumerName);
+
+    /**
+     * If enabled, the consumer will read messages from the compacted topic 
rather than reading the full message backlog
+     * of the topic. This means that, if the topic has been compacted, the 
consumer will only see the latest value for
+     * each key in the topic, up until the point in the topic message backlog 
that has been compacted. Beyond that
+     * point, the messages will be sent as normal.
+     *
+     * readCompacted can only be enabled subscriptions to persistent topics, 
which have a single active consumer (i.e.
+     * failure or exclusive subscriptions). Attempting to enable it on 
subscriptions to a non-persistent topics or on a
+     * shared subscription, will lead to the subscription call throwing a 
PulsarClientException.
+     *
+     * @param readCompacted
+     *            whether to read from the compacted topic
+     */
+    ConsumerBuilder readCompacted(boolean readCompacted);
+
+    /**
+     * Sets priority level for the shared subscription consumers to which 
broker gives more priority while dispatching
+     * messages. Here, broker follows descending priorities. (eg: 
0=max-priority, 1, 2,..) </br>
+     * In Shared subscription mode, broker will first dispatch messages to max 
priority-level consumers if they have
+     * permits, else broker will consider next priority level consumers. </br>
+     * If subscription has consumer-A with priorityLevel 0 and Consumer-B with 
priorityLevel 1 then broker will dispatch
+     * messages to only consumer-A until it runs out permit and then broker 
starts dispatching messages to Consumer-B.
+     *
+     * <pre>
+     * Consumer PriorityLevel Permits
+     * C1       0             2
+     * C2       0             1
+     * C3       0             1
+     * C4       1             2
+     * C5       1             1
+     * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, 
C4, C5, C4
+     * </pre>
+     *
+     * @param priorityLevel
+     */
+    ConsumerBuilder priorityLevel(int priorityLevel);
+
+    /**
+     * Set a name/value property with this consumer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    ConsumerBuilder property(String key, String value);
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    ConsumerBuilder properties(Map<String, String> properties);
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 00e4537ad..5f25fa83b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -31,8 +31,9 @@
  * attach to the subscription. Other consumers will get an error message. In 
Shared subscription, multiple consumers
  * will be able to use the same subscription name and the messages will be 
dispatched in a round robin fashion.
  *
- *
+ * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a 
{@link Consumer} instance
  */
+@Deprecated
 public class ConsumerConfiguration implements Serializable {
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
index 46496b7b7..4acb6e9cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
@@ -18,34 +18,33 @@
  */
 package org.apache.pulsar.client.api;
 
-import java.util.List;
 import java.util.Map;
 
 public interface CryptoKeyReader {
 
-    /*
+    /**
      * Return the encryption key corresponding to the key name in the argument
      * <p>
-     * This method should be implemented to return the EncryptionKeyInfo. This 
method will be
-     * called at the time of producer creation as well as consumer receiving 
messages.
-     * Hence, application should not make any blocking calls within the 
implementation. 
+     * This method should be implemented to return the EncryptionKeyInfo. This 
method will be called at the time of
+     * producer creation as well as consumer receiving messages. Hence, 
application should not make any blocking calls
+     * within the implementation.
      * <p>
-     * 
-    * @param keyName
-    *            Unique name to identify the key
-    * @param metadata
-    *            Additional information needed to identify the key
-    * @return EncryptionKeyInfo with details about the public key
-    * */
+     *
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @return EncryptionKeyInfo with details about the public key
+     */
     EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata);
 
-    /*
-    * @param keyName
-    *            Unique name to identify the key
-    * @param metadata
-    *            Additional information needed to identify the key
-    * @return byte array of the private key value
-    */
+    /**
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @return byte array of the private key value
+     */
     EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata);
-    
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
index 25c89755b..a9cef6f73 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
@@ -23,7 +23,7 @@
 public interface MessageRouter extends Serializable {
 
     /**
-     * 
+     *
      * @param msg
      *            Message object
      * @return The index of the partition to use for the message
@@ -42,7 +42,6 @@ default int choosePartition(Message msg) {
      * @return the partition to route the message.
      * @since 1.22.0
      */
-    @SuppressWarnings("deprecation")
     default int choosePartition(Message msg, TopicMetadata metadata) {
         return choosePartition(msg);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
new file mode 100644
index 000000000..1d45489b9
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+public enum MessageRoutingMode {
+    SinglePartition, RoundRobinPartition, CustomPartition
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
new file mode 100644
index 000000000..5ab1215dc
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
+
+/**
+ * {@link ProducerBuilder} is used to configure and create instances of {@link 
Producer}.
+ *
+ * @see PulsarClient#newProducer()
+ */
+public interface ProducerBuilder extends Serializable, Cloneable {
+
+    /**
+     * Finalize the creation of the {@link Producer} instance.
+     * <p>
+     * This method will block until the producer is created successfully.
+     *
+     * @return the producer instance
+     * @throws PulsarClientException.ProducerBusyException
+     *             if a producer with the same "producer name" is already 
connected to the topic
+     * @throws PulsarClientException
+     *             if the producer creation fails
+     */
+    Producer create() throws PulsarClientException;
+
+    /**
+     * Finalize the creation of the {@link Producer} instance in asynchronous 
mode.
+     * <p>
+     * This method will return a {@link CompletableFuture} that can be used to 
access the instance when it's ready.
+     *
+     * @return a future that will yield the created producer instance
+     * @throws PulsarClientException.ProducerBusyException
+     *             if a producer with the same "producer name" is already 
connected to the topic
+     * @throws PulsarClientException
+     *             if the producer creation fails
+     */
+    CompletableFuture<Producer> createAsync();
+
+    /**
+     * Create a copy of the current {@link ProducerBuilder}.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ProducerBuilder builder = client.newProducer().sendTimeout(10, 
TimeUnit.SECONDS).blockIfQueueFull(true);
+     *
+     * Producer producer1 = builder.clone().topic(TOPIC_1).create();
+     * Producer producer2 = builder.clone().topic(TOPIC_2).create();
+     * </pre>
+     */
+    ProducerBuilder clone();
+
+    /**
+     * Specify the topic this producer will be publishing on.
+     * <p>
+     * This argument is required when constructing the produce.
+     *
+     * @param topicName
+     */
+    ProducerBuilder topic(String topicName);
+
+    /**
+     * Specify a name for the producer
+     * <p>
+     * If not assigned, the system will generate a globally unique name which 
can be access with
+     * {@link Producer#getProducerName()}.
+     * <p>
+     * When specifying a name, it is up to the user to ensure that, for a 
given topic, the producer name is unique
+     * across all Pulsar's clusters. Brokers will enforce that only a single 
producer a given name can be publishing on
+     * a topic.
+     *
+     * @param producerName
+     *            the custom name to use for the producer
+     */
+    ProducerBuilder producerName(String producerName);
+
+    /**
+     * Set the send timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * If a message is not acknowledged by the server before the sendTimeout 
expires, an error will be reported.
+     *
+     * @param sendTimeout
+     *            the send timeout
+     * @param unit
+     *            the time unit of the {@code sendTimeout}
+     */
+    ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit);
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive 
an acknowledgment from the broker.
+     * <p>
+     * When the queue is full, by default, all calls to {@link Producer#send} 
and {@link Producer#sendAsync} will fail
+     * unless blockIfQueueFull is set to true. Use {@link 
#setBlockIfQueueFull} to change the blocking behavior.
+     *
+     * @param maxPendingMessages
+     * @return
+     */
+    ProducerBuilder maxPendingMessages(int maxPendingMessages);
+
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each 
partition
+     * ({@link #maxPendingMessages(int)}), if the total exceeds the configured 
value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    ProducerBuilder maxPendingMessagesAcrossPartitions(int 
maxPendingMessagesAcrossPartitions);
+
+    /**
+     * Set whether the {@link Producer#send} and {@link Producer#sendAsync} 
operations should block when the outgoing
+     * message queue is full.
+     * <p>
+     * Default is <code>false</code>. If set to <code>false</code>, send 
operations will immediately fail with
+     * {@link ProducerQueueIsFullError} when there is no space left in pending 
queue.
+     *
+     * @param blockIfQueueFull
+     *            whether to block {@link Producer#send} and {@link 
Producer#sendAsync} operations on queue full
+     * @return
+     */
+    ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull);
+
+    /**
+     * Set the message routing mode for the partitioned producer
+     *
+     * @param mode
+     * @return
+     */
+    ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode);
+
+    /**
+     * Set the compression type for the producer.
+     * <p>
+     * By default, message payloads are not compressed. Supported compression 
types are:
+     * <ul>
+     * <li><code>CompressionType.LZ4</code></li>
+     * <li><code>CompressionType.ZLIB</code></li>
+     * </ul>
+     *
+     * @param compressionType
+     * @return
+     */
+    ProducerBuilder compressionType(CompressionType compressionType);
+
+    /**
+     * Set a custom message routing policy by passing an implementation of 
MessageRouter
+     *
+     *
+     * @param messageRouter
+     */
+    ProducerBuilder messageRouter(MessageRouter messageRouter);
+
+    /**
+     * Control whether automatic batching of messages is enabled for the 
producer. <i>default: false [No batching]</i>
+     *
+     * When batching is enabled, multiple calls to Producer.sendAsync can 
result in a single batch to be sent to the
+     * broker, leading to better throughput, especially when publishing small 
messages. If compression is enabled,
+     * messages will be compressed at the batch level, leading to a much 
better compression ratio for similar headers or
+     * contents.
+     *
+     * When enabled default batch delay is set to 10 ms and default batch size 
is 1000 messages
+     *
+     * @see #batchingMaxPublishDelay(long, TimeUnit)
+     */
+    ProducerBuilder enableBatching(boolean enableBatching);
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
+
+    /**
+     * Add public encryption key, used by producer to encrypt the data key.
+     *
+     * At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys. If keys are
+     * found, a callback getKey(String keyName) is invoked against each key to 
load the values of the key. Application
+     * should implement this callback to return the key in pkcs8 format. If 
compression is enabled, message is encrypted
+     * after compression. If batch messaging is enabled, the batched message 
is encrypted.
+     *
+     */
+    ProducerBuilder addEncryptionKey(String key);
+
+    /**
+     * Sets the ProducerCryptoFailureAction to the value specified
+     *
+     * @param The
+     *            producer action
+     */
+    ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action);
+
+    /**
+     * Set the time period within which the messages sent will be batched 
<i>default: 10ms</i> if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until this 
time interval or until
+     *
+     * @see ProducerConfiguration#batchingMaxMessages threshold is reached; 
all messages will be published as a single
+     *      batch message. The consumer will be delivered individual messages 
in the batch in the same order they were
+     *      enqueued
+     * @param batchDelay
+     *            the batch delay
+     * @param timeUnit
+     *            the time unit of the {@code batchDelay}
+     * @return
+     */
+    ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit 
timeUnit);
+
+    /**
+     * Set the maximum number of messages permitted in a batch. <i>default: 
1000</i> If set to a value greater than 1,
+     * messages will be queued until this threshold is reached or batch 
interval has elapsed
+     *
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) 
All messages in batch will be published as
+     *      a single batch message. The consumer will be delivered individual 
messages in the batch in the same order
+     *      they were enqueued
+     * @param batchMessagesMaxMessagesPerBatch
+     *            maximum number of messages in a batch
+     * @return
+     */
+    ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the 
producer.
+     * <p>
+     * First message will be using (initialSequenceId + 1) as its sequence id 
and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     *
+     * @param initialSequenceId
+     * @return
+     */
+    ProducerBuilder initialSequenceId(long initialSequenceId);
+
+    /**
+     * Set a name/value property with this producer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    ProducerBuilder property(String key, String value);
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    ProducerBuilder properties(Map<String, String> properties);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index a6d2a5583..be72fc7e0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -28,7 +28,6 @@
 import java.util.concurrent.TimeUnit;
 
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 
 import com.google.common.base.Objects;
@@ -36,7 +35,9 @@
 /**
  * Producer's configuration
  *
+ * @deprecated use {@link PulsarClient#newProducer()} to construct and 
configure a {@link Producer} instance
  */
+@Deprecated
 public class ProducerConfiguration implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -268,7 +269,7 @@ public ProducerConfiguration setMessageRouter(MessageRouter 
messageRouter) {
      *
      * @return message router.
      * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already 
passed as parameter in
-     * {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
+     *             {@link MessageRouter#choosePartition(Message, 
TopicMetadata)}.
      * @see MessageRouter
      */
     @Deprecated
@@ -338,7 +339,7 @@ public ProducerConfiguration 
setCryptoKeyReader(CryptoKeyReader cryptoKeyReader)
      * @return encryptionKeys
      *
      */
-    public  ConcurrentOpenHashSet<String> getEncryptionKeys() {
+    public ConcurrentOpenHashSet<String> getEncryptionKeys() {
         return this.encryptionKeys;
     }
 
@@ -354,16 +355,15 @@ public boolean isEncryptionEnabled() {
     /**
      * Add public encryption key, used by producer to encrypt the data key.
      *
-     * At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys.
-     * If keys are found, a callback getKey(String keyName) is invoked against 
each key to load
-     * the values of the key. Application should implement this callback to 
return the key in pkcs8 format.
-     * If compression is enabled, message is encrypted after compression.
-     * If batch messaging is enabled, the batched message is encrypted.
+     * At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys. If keys are
+     * found, a callback getKey(String keyName) is invoked against each key to 
load the values of the key. Application
+     * should implement this callback to return the key in pkcs8 format. If 
compression is enabled, message is encrypted
+     * after compression. If batch messaging is enabled, the batched message 
is encrypted.
      *
      */
     public void addEncryptionKey(String key) {
         if (this.encryptionKeys == null) {
-            this.encryptionKeys = new ConcurrentOpenHashSet<String>(16,1);
+            this.encryptionKeys = new ConcurrentOpenHashSet<String>(16, 1);
         }
         this.encryptionKeys.add(key);
     }
@@ -377,7 +377,8 @@ public void removeEncryptionKey(String key) {
     /**
      * Sets the ProducerCryptoFailureAction to the value specified
      *
-     * @param The producer action
+     * @param action
+     *            The producer action
      */
     public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
         cryptoFailureAction = action;
@@ -467,6 +468,7 @@ public ProducerConfiguration setInitialSequenceId(long 
initialSequenceId) {
 
     /**
      * Set a name/value property with this producer.
+     *
      * @param key
      * @param value
      * @return
@@ -480,6 +482,7 @@ public ProducerConfiguration setProperty(String key, String 
value) {
 
     /**
      * Add all the properties in the provided map
+     *
      * @param properties
      * @return
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index e15db40a6..6ba2518d0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -21,15 +21,28 @@
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 
 /**
- * Class that provides a client interface to Pulsar
- *
- *
+ * Class that provides a client interface to Pulsar.
+ * <p>
+ * Client instances are thread-safe and can be reused for managing multiple 
{@link Producer}, {@link Consumer} and
+ * {@link Reader} instances.
  */
 public interface PulsarClient extends Closeable {
 
+    /**
+     * Get a new builder instance that can used to configure and build a 
{@link PulsarClient} instance.
+     *
+     * @return the {@link ClientBuilder}
+     *
+     * @since 2.0.0
+     */
+    public static ClientBuilder builder() {
+        return new ClientBuilderImpl();
+    }
+
     /**
      * Create a new PulsarClient object using default client configuration
      *
@@ -38,7 +51,9 @@
      * @return a new pulsar client object
      * @throws PulsarClientException.InvalidServiceURL
      *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
      */
+    @Deprecated
     public static PulsarClient create(String serviceUrl) throws 
PulsarClientException {
         return create(serviceUrl, new ClientConfiguration());
     }
@@ -53,11 +68,50 @@ public static PulsarClient create(String serviceUrl) throws 
PulsarClientExceptio
      * @return a new pulsar client object
      * @throws PulsarClientException.InvalidServiceURL
      *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
      */
+    @Deprecated
     public static PulsarClient create(String serviceUrl, ClientConfiguration 
conf) throws PulsarClientException {
         return new PulsarClientImpl(serviceUrl, conf);
     }
 
+    /**
+     * Create a producer with default for publishing on a specific topic
+     * <p>
+     * Example:
+     *
+     * <code>
+     * Producer producer = client.newProducer().topic(myTopic).create();
+     * </code>
+     *
+     *
+     * @return a {@link ProducerBuilder} object to configure and construct the 
{@link Producer} instance
+     *
+     * @since 2.0.0
+     */
+    ProducerBuilder newProducer();
+
+    /**
+     * Create a producer with default for publishing on a specific topic
+     *
+     * @return a {@link ProducerBuilder} object to configure and construct the 
{@link Producer} instance
+     *
+     * @since 2.0.0
+     */
+    ConsumerBuilder newConsumer();
+
+    /**
+     * Create a topic reader for reading messages from the specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual 
positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     *
+     * @return a {@link ReaderBuilder} that can be used to configure and 
construct a {@link Reader} instance
+     *
+     * @since 2.0.0
+     */
+    ReaderBuilder newReader();
+
     /**
      * Create a producer with default {@link ProducerConfiguration} for 
publishing on a specific topic
      *
@@ -72,7 +126,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      *             if there was an error with the supplied credentials
      * @throws PulsarClientException.AuthorizationException
      *             if the authorization to publish on topic was denied
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     Producer createProducer(String topic) throws PulsarClientException;
 
     /**
@@ -81,7 +137,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @param topic
      *            The name of the topic where to produce
      * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     CompletableFuture<Producer> createProducerAsync(String topic);
 
     /**
@@ -95,7 +153,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @throws PulsarClientException
      *             if it was not possible to create the producer
      * @throws InterruptedException
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     Producer createProducer(String topic, ProducerConfiguration conf) throws 
PulsarClientException;
 
     /**
@@ -106,7 +166,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @param conf
      *            The {@code ProducerConfiguration} object
      * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     CompletableFuture<Producer> createProducerAsync(String topic, 
ProducerConfiguration conf);
 
     /**
@@ -119,7 +181,10 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @return The {@code Consumer} object
      * @throws PulsarClientException
      * @throws InterruptedException
+     *
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     Consumer subscribe(String topic, String subscription) throws 
PulsarClientException;
 
     /**
@@ -131,7 +196,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @param subscription
      *            The subscription name
      * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     CompletableFuture<Consumer> subscribeAsync(String topic, String 
subscription);
 
     /**
@@ -145,7 +212,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      *            The {@code ConsumerConfiguration} object
      * @return The {@code Consumer} object
      * @throws PulsarClientException
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     Consumer subscribe(String topic, String subscription, 
ConsumerConfiguration conf) throws PulsarClientException;
 
     /**
@@ -159,7 +228,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @param conf
      *            The {@code ConsumerConfiguration} object
      * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     CompletableFuture<Consumer> subscribeAsync(String topic, String 
subscription, ConsumerConfiguration conf);
 
     /**
@@ -185,7 +256,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @param conf
      *            The {@code ReaderConfiguration} object
      * @return The {@code Reader} object
+     * @deprecated Use {@link #newReader()} to build a new reader
      */
+    @Deprecated
     Reader createReader(String topic, MessageId startMessageId, 
ReaderConfiguration conf) throws PulsarClientException;
 
     /**
@@ -212,7 +285,9 @@ public static PulsarClient create(String serviceUrl, 
ClientConfiguration conf) t
      * @param conf
      *            The {@code ReaderConfiguration} object
      * @return Future of the asynchronously created producer object
+     * @deprecated Use {@link #newReader()} to build a new reader
      */
+    @Deprecated
     CompletableFuture<Reader> createReaderAsync(String topic, MessageId 
startMessageId, ReaderConfiguration conf);
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
new file mode 100644
index 000000000..196af2057
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link ReaderBuilder} is used to configure and create instances of {@link 
Reader}.
+ *
+ * @see PulsarClient#newReader()
+ *
+ * @since 2.0.0
+ */
+public interface ReaderBuilder extends Serializable, Cloneable {
+
+    /**
+     * Finalize the creation of the {@link Reader} instance.
+     *
+     * <p>
+     * This method will block until the reader is created successfully.
+     *
+     * @return the reader instance
+     * @throws PulsarClientException
+     *             if the reader creation fails
+     */
+    Reader create() throws PulsarClientException;
+
+    /**
+     * Finalize the creation of the {@link Reader} instance in asynchronous 
mode.
+     *
+     * <p>
+     * This method will return a {@link CompletableFuture} that can be used to 
access the instance when it's ready.
+     *
+     * @return the reader instance
+     * @throws PulsarClientException
+     *             if the reader creation fails
+     */
+    CompletableFuture<Reader> createAsync();
+
+    /**
+     * Create a copy of the current {@link ReaderBuilder}.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ReaderBuilder builder = 
client.newReader().readerName("my-reader").receiverQueueSize(10);
+     *
+     * Reader reader1 = builder.clone().topic(TOPIC_1).create();
+     * Reader reader2 = builder.clone().topic(TOPIC_2).create();
+     * </pre>
+     */
+    ReaderBuilder clone();
+
+    /**
+     * Specify the topic this consumer will subscribe on.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param topicName
+     */
+    ReaderBuilder topic(String topicName);
+
+    /**
+     * The initial reader positioning is done by specifying a message id. The 
options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest 
message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, 
only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the 
reader will position itself on that
+     * specific position. The first message to be read will be the message 
next to the specified messageId.
+     * </ul>
+     */
+    ReaderBuilder startMessageId(MessageId startMessageId);
+
+    /**
+     * Sets a {@link ReaderListener} for the reader
+     * <p>
+     * When a {@link ReaderListener} is set, application will receive messages 
through it. Calls to
+     * {@link Reader#readNext()} will not be allowed.
+     *
+     * @param readerListener
+     *            the listener object
+     */
+    ReaderBuilder readerListener(ReaderListener readerListener);
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            The action to take when the decoding fails
+     */
+    ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action);
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be 
accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value 
could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use 
cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    ReaderBuilder receiverQueueSize(int receiverQueueSize);
+
+    /**
+     * Set the reader name.
+     *
+     * @param readerName
+     */
+    ReaderBuilder readerName(String readerName);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
index 999e2e60a..6f3816c45 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -23,6 +23,11 @@
 
 import java.io.Serializable;
 
+/**
+ *
+ * @deprecated Use {@link PulsarClient#newReader()} to construct and configure 
a {@link Reader} instance
+ */
+@Deprecated
 public class ReaderConfiguration implements Serializable {
 
     private int receiverQueueSize = 1000;
@@ -84,8 +89,9 @@ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader 
cryptoKeyReader) {
 
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
-     * 
-     * @param The consumer action
+     *
+     * @param action
+     *            The action to take when the decoding fails
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
         cryptoFailureAction = action;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
new file mode 100644
index 000000000..2be5318e0
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+
+@SuppressWarnings("deprecation")
+public class ClientBuilderImpl implements ClientBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    String serviceUrl;
+    final ClientConfiguration conf = new ClientConfiguration();
+
+    @Override
+    public PulsarClient build() throws PulsarClientException {
+        if (serviceUrl == null) {
+            throw new IllegalArgumentException("service URL needs to be 
specified on the ClientBuilder object");
+        }
+
+        return new PulsarClientImpl(serviceUrl, conf);
+    }
+
+    @Override
+    public ClientBuilder clone() {
+        try {
+            return (ClientBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ClientBuilderImpl");
+        }
+    }
+
+    @Override
+    public ClientBuilder serviceUrl(String serviceUrl) {
+        this.serviceUrl = serviceUrl;
+        return this;
+    }
+
+    @Override
+    public ClientBuilder authentication(Authentication authentication) {
+        conf.setAuthentication(authentication);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder authentication(String authPluginClassName, String 
authParamsString)
+            throws UnsupportedAuthenticationException {
+        conf.setAuthentication(authPluginClassName, authParamsString);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder authentication(String authPluginClassName, 
Map<String, String> authParams)
+            throws UnsupportedAuthenticationException {
+        conf.setAuthentication(authPluginClassName, authParams);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) 
{
+        conf.setOperationTimeout(operationTimeout, unit);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder ioThreads(int numIoThreads) {
+        conf.setIoThreads(numIoThreads);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder listenerThreads(int numListenerThreads) {
+        conf.setListenerThreads(numListenerThreads);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder connectionsPerBroker(int connectionsPerBroker) {
+        conf.setConnectionsPerBroker(connectionsPerBroker);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder enableTcpNoDelay(boolean useTcpNoDelay) {
+        conf.setUseTcpNoDelay(useTcpNoDelay);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder enableTls(boolean useTls) {
+        conf.setUseTls(useTls);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder enableTlsHostnameVerification(boolean 
enableTlsHostnameVerification) {
+        conf.setTlsHostnameVerificationEnable(enableTlsHostnameVerification);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
+        conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder allowTlsInsecureConnection(boolean 
tlsAllowInsecureConnection) {
+        conf.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
+        conf.setStatsInterval(statsInterval, unit);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder maxConcurrentLookupRequests(int 
concurrentLookupRequests) {
+        conf.setConcurrentLookupRequest(concurrentLookupRequests);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder maxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRequestPerConnection) {
+        
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
new file mode 100644
index 000000000..ab9132615
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.FutureUtil;
+
+
+@SuppressWarnings("deprecation")
+public class ConsumerBuilderImpl implements ConsumerBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClientImpl client;
+    private String topicName;
+    private String subscriptionName;
+    private final ConsumerConfiguration conf;
+
+    ConsumerBuilderImpl(PulsarClientImpl client) {
+        this.client = client;
+        this.conf = new ConsumerConfiguration();
+    }
+
+    @Override
+    public ConsumerBuilder clone() {
+        try {
+            return (ConsumerBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ConsumerBuilderImpl");
+        }
+    }
+
+    @Override
+    public Consumer subscribe() throws PulsarClientException {
+        try {
+            return subscribeAsync().get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync() {
+        if (topicName == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
+        }
+
+        if (subscriptionName == null) {
+            return FutureUtil.failedFuture(
+                    new IllegalArgumentException("Subscription name must be 
set on the producer builder"));
+        }
+
+        return client.subscribeAsync(topicName, subscriptionName, conf);
+    }
+
+    @Override
+    public ConsumerBuilder topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder subscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) {
+        conf.setAckTimeout(ackTimeout, timeUnit);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder subscriptionType(SubscriptionType subscriptionType) 
{
+        conf.setSubscriptionType(subscriptionType);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder messageListener(MessageListener messageListener) {
+        conf.setMessageListener(messageListener);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction 
action) {
+        conf.setCryptoFailureAction(action);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder receiverQueueSize(int receiverQueueSize) {
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder consumerName(String consumerName) {
+        conf.setConsumerName(consumerName);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder priorityLevel(int priorityLevel) {
+        conf.setPriorityLevel(priorityLevel);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder property(String key, String value) {
+        conf.setProperty(key, value);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder properties(Map<String, String> properties) {
+        conf.setProperties(properties);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int 
maxTotalReceiverQueueSizeAcrossPartitions) {
+        
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder readCompacted(boolean readCompacted) {
+        conf.setReadCompacted(readCompacted);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
new file mode 100644
index 000000000..6bb9a9b59
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class ProducerBuilderImpl implements ProducerBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClientImpl client;
+    private String topicName;
+    private final ProducerConfiguration conf;
+
+    ProducerBuilderImpl(PulsarClientImpl client) {
+        this.client = client;
+        this.conf = new ProducerConfiguration();
+    }
+
+    @Override
+    public ProducerBuilder clone() {
+        try {
+            return (ProducerBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ProducerBuilderImpl");
+        }
+    }
+
+    @Override
+    public Producer create() throws PulsarClientException {
+        try {
+            return createAsync().get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Producer> createAsync() {
+        if (topicName == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
+        }
+
+        return client.createProducerAsync(topicName, conf);
+    }
+
+    @Override
+    public ProducerBuilder topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder producerName(String producerName) {
+        conf.setProducerName(producerName);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) {
+        conf.setSendTimeout(sendTimeout, unit);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder maxPendingMessages(int maxPendingMessages) {
+        conf.setMaxPendingMessages(maxPendingMessages);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder maxPendingMessagesAcrossPartitions(int 
maxPendingMessagesAcrossPartitions) {
+        
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull) {
+        conf.setBlockIfQueueFull(blockIfQueueFull);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder messageRoutingMode(MessageRoutingMode 
messageRouteMode) {
+        
conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder compressionType(CompressionType compressionType) {
+        conf.setCompressionType(compressionType);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder messageRouter(MessageRouter messageRouter) {
+        conf.setMessageRouter(messageRouter);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder enableBatching(boolean batchMessagesEnabled) {
+        conf.setBatchingEnabled(batchMessagesEnabled);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder addEncryptionKey(String key) {
+        conf.addEncryptionKey(key);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction 
action) {
+        conf.setCryptoFailureAction(action);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit 
timeUnit) {
+        conf.setBatchingMaxPublishDelay(batchDelay, timeUnit);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder batchingMaxMessages(int 
batchMessagesMaxMessagesPerBatch) {
+        conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder initialSequenceId(long initialSequenceId) {
+        conf.setInitialSequenceId(initialSequenceId);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder property(String key, String value) {
+        conf.setProperty(key, value);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder properties(Map<String, String> properties) {
+        conf.setProperties(properties);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 029f712f3..6e1c5afde 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -32,13 +32,16 @@
 
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.util.ExecutorProvider;
@@ -58,6 +61,7 @@
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+@SuppressWarnings("deprecation")
 public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
@@ -92,8 +96,7 @@ public PulsarClientImpl(String serviceUrl, 
ClientConfiguration conf, EventLoopGr
     }
 
     public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, 
EventLoopGroup eventLoopGroup,
-            ConnectionPool cnxPool)
-            throws PulsarClientException {
+            ConnectionPool cnxPool) throws PulsarClientException {
         if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) {
             throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
         }
@@ -117,6 +120,21 @@ public ClientConfiguration getConfiguration() {
         return conf;
     }
 
+    @Override
+    public ProducerBuilder newProducer() {
+        return new ProducerBuilderImpl(this);
+    }
+
+    @Override
+    public ConsumerBuilder newConsumer() {
+        return new ConsumerBuilderImpl(this);
+    }
+
+    @Override
+    public ReaderBuilder newReader() {
+        return new ReaderBuilderImpl(this);
+    }
+
     @Override
     public Producer createProducer(String destination) throws 
PulsarClientException {
         try {
@@ -157,6 +175,7 @@ public Producer createProducer(final String destination, 
final ProducerConfigura
         return createProducerAsync(topic, new ProducerConfiguration());
     }
 
+    @Override
     public CompletableFuture<Producer> createProducerAsync(final String topic, 
final ProducerConfiguration conf) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
new file mode 100644
index 000000000..f3751340f
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class ReaderBuilderImpl implements ReaderBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClientImpl client;
+
+    private final ReaderConfiguration conf;
+    private String topicName;
+    private MessageId startMessageId;
+
+    ReaderBuilderImpl(PulsarClientImpl client) {
+        this.client = client;
+        this.conf = new ReaderConfiguration();
+    }
+
+    @Override
+    public ReaderBuilder clone() {
+        try {
+            return (ReaderBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ReaderBuilderImpl");
+        }
+    }
+
+    @Override
+    public Reader create() throws PulsarClientException {
+        try {
+            return createAsync().get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Reader> createAsync() {
+        if (topicName == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic name 
must be set on the reader builder"));
+        }
+
+        if (startMessageId == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Start message 
id must be set on the reader builder"));
+        }
+
+        return client.createReaderAsync(topicName, startMessageId, conf);
+    }
+
+    @Override
+    public ReaderBuilder topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder startMessageId(MessageId startMessageId) {
+        this.startMessageId = startMessageId;
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder readerListener(ReaderListener readerListener) {
+        conf.setReaderListener(readerListener);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction 
action) {
+        conf.setCryptoFailureAction(action);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder receiverQueueSize(int receiverQueueSize) {
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder readerName(String readerName) {
+        conf.setReaderName(readerName);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
index 5068a2c06..f96d2885c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
@@ -18,31 +18,25 @@
  */
 package org.apache.pulsar.client.api;
 
-import org.testng.annotations.Test;
-
-import com.google.common.base.Objects;
-
 import static org.testng.Assert.assertEquals;
 
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.ConsumerId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.testng.Assert;
+import org.testng.annotations.Test;
 
 public class MessageIdTest {
-    
+
     @Test
     public void messageIdTest() {
         MessageId mId = new MessageIdImpl(1, 2, 3);
         assertEquals(mId.toString(), "1:2:3");
-        
+
         mId = new BatchMessageIdImpl(0, 2, 3, 4);
         assertEquals(mId.toString(), "0:2:3:4");
-        
+
         mId = new BatchMessageIdImpl(-1, 2, -3, 4);
         assertEquals(mId.toString(), "-1:2:-3:4");
-        
+
         mId = new MessageIdImpl(0, -23, 3);
         assertEquals(mId.toString(), "0:-23:3");
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
new file mode 100644
index 000000000..f4b89d5d4
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("deprecation")
+public class BuildersTest {
+
+    @Test
+    public void clientBuilderTest() {
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().enableTls(true).ioThreads(10)
+                
.maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650");
+
+        assertEquals(clientBuilder.conf.isUseTls(), true);
+        assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
+
+        ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone();
+        assertTrue(b2 != clientBuilder);
+
+        b2.serviceUrl("pulsar://other-broker:6650");
+
+        assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
+        assertEquals(b2.serviceUrl, "pulsar://other-broker:6650");
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
index 43b7b2699..97f25b149 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
@@ -26,14 +26,14 @@
 
 public class SampleProducer {
     public static void main(String[] args) throws PulsarClientException, 
InterruptedException, IOException {
-        PulsarClient pulsarClient = 
PulsarClient.create("http://127.0.0.1:8080";);
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("http://localhost:6650";).build();
 
-        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic");
+        Producer producer = 
client.newProducer().topic("persistent://my-property/use/my-ns/my-topic").create();
 
         for (int i = 0; i < 10; i++) {
             producer.send("my-message".getBytes());
         }
 
-        pulsarClient.close();
+        client.close();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to