This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fbb926  Update default values for a few publisher settings (#1440)
8fbb926 is described below

commit 8fbb926089effcadf59e199fe69ec65b4bbe4eae
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Sun Apr 15 21:29:23 2018 -0700

    Update default values for a few publisher settings (#1440)
---
 .../pulsar/broker/service/AbstractReplicator.java  |   6 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  74 ++++++++--
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  34 ++++-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |  71 +++++++---
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |  42 ++++--
 .../pulsar/broker/service/BatchMessageTest.java    |  13 +-
 .../broker/service/PersistentFailoverE2ETest.java  |  13 +-
 .../broker/service/PersistentQueueE2ETest.java     |  13 +-
 .../broker/service/PersistentTopicE2ETest.java     | 149 +++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      |  16 ++-
 .../pulsar/broker/service/ReplicatorTestBase.java  |  18 ++-
 .../pulsar/broker/service/ResendRequestTest.java   |  36 +++--
 .../broker/service/TopicTerminationTest.java       |  51 +++++--
 .../pulsar/client/api/BrokerServiceLookupTest.java |   3 +-
 .../client/api/DispatcherBlockConsumerTest.java    |   4 +-
 .../pulsar/client/api/NonPersistentTopicTest.java  |  20 ++-
 .../api/PartitionedProducerConsumerTest.java       |  19 ++-
 .../client/api/SimpleProducerConsumerStatTest.java |  10 +-
 .../client/api/SimpleProducerConsumerTest.java     |  49 +++++--
 .../client/api/v1/V1_ProducerConsumerTest.java     |  24 ++++
 .../apache/pulsar/client/impl/MessageIdTest.java   |  34 ++++-
 .../client/impl/PatternTopicsConsumerImplTest.java |  18 +++
 .../PerMessageUnAcknowledgedRedeliveryTest.java    |  18 ++-
 .../apache/pulsar/client/impl/RawReaderTest.java   |  26 +++-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  17 +++
 .../impl/UnAcknowledgedMessagesTimeoutTest.java    |  23 +++-
 .../pulsar/client/impl/ZeroQueueSizeTest.java      |  21 ++-
 .../apache/pulsar/compaction/CompactionTest.java   |  75 ++++++++---
 .../apache/pulsar/compaction/CompactorTest.java    |  16 ++-
 .../apache/pulsar/client/api/ProducerBuilder.java  |  12 +-
 .../pulsar/client/api/ProducerConfiguration.java   |  21 +--
 .../impl/conf/ProducerConfigurationData.java       |   6 +-
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  13 +-
 .../apache/pulsar/websocket/ProducerHandler.java   |   3 +
 .../pulsar/tests/integration/TestCompaction.java   |   7 +-
 35 files changed, 774 insertions(+), 201 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index d3d15d9..ff85e34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
@@ -74,7 +75,10 @@ public abstract class AbstractReplicator {
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
         this.producerBuilder = client.newProducer() //
-                .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
+                .topic(topicName)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
         STATE_UPDATER.set(this, State.Stopped);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 4900b5b..7c24692 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -640,7 +640,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new 
PersistencePolicies(3, 2, 1, 10.0));
 
         // Force topic creation and namespace being loaded
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/my-topic").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1/my-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.close();
         admin.persistentTopics().delete("persistent://prop-xyz/ns1/my-topic");
 
@@ -804,8 +808,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         admin.persistentTopics().deleteSubscription(partitionedTopicName, 
"my-sub-1");
         
assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), 
Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
@@ -859,7 +866,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer().topic(partitionedTopicName).create();
+        producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         topics = admin.persistentTopics().getList("prop-xyz/ns1");
         assertEquals(topics.size(), 4);
@@ -918,7 +929,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -944,7 +959,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -1037,7 +1056,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1095,7 +1118,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1148,7 +1175,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1157,7 +1188,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds1").create();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1-bundles/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer1.send(message.getBytes());
@@ -1251,7 +1286,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -1296,7 +1335,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/my-topic";
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         try {
             admin.persistentTopics().delete(topicName);
@@ -1684,8 +1727,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         Consumer<byte[]> consumer = 
client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://prop-xyz/ns1/ds1")
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://prop-xyz/ns1/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index e68a5c5..ad95065 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -173,8 +173,11 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
                 .toString();
 
         // (3) produce messages to all partitions including newly created 
partitions (RoundRobin)
-        Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         final int totalMessages = newPartitions * 2;
         for (int i = 0; i < totalMessages; i++) {
             String message = "message-" + i;
@@ -268,7 +271,11 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnTopic(String topicName, int messages, int 
startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -331,7 +338,11 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().setPersistence(namespace, new 
PersistencePolicies(3, 3, 3, 50.0));
         assertEquals(admin.namespaces().getPersistence(namespace), new 
PersistencePolicies(3, 3, 3, 50.0));
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -381,7 +392,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
 
         // recreation of producer will load the topic again
-        producer = pulsarClient.newProducer().topic(topicName).create();
+        pulsarClient.newProducer().topic(topicName).create();
         topic = pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topic);
         // unload the topic
@@ -502,7 +513,11 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -741,7 +756,12 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName(subscriberName)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = 
client.newProducer().topic(topic).producerName(producerName).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .producerName(producerName)
+            .create();
 
         retryStrategically((test) -> {
             PersistentTopicStats stats;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 42905c7..6895036 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -805,8 +805,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         admin.persistentTopics().deleteSubscription(partitionedTopicName, 
"my-sub-1");
         
assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), 
Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
@@ -860,7 +863,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer().topic(partitionedTopicName).create();
+        producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         topics = admin.persistentTopics().getList("prop-xyz/use/ns1");
         assertEquals(topics.size(), 4);
@@ -918,7 +925,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -944,7 +955,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -1064,7 +1079,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1121,8 +1140,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1174,8 +1196,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1184,8 +1209,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds1")
-                .create();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1-bundles/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer1.send(message.getBytes());
@@ -1278,7 +1306,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -1323,7 +1355,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/use/ns1/my-topic";
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         try {
             admin.persistentTopics().delete(topicName);
@@ -1711,8 +1747,11 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         Consumer<byte[]> consumer = 
client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://prop-xyz/use/ns1/ds1")
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://prop-xyz/use/ns1/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 6ac2b39..3551141 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -167,7 +167,9 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
         // (3) produce messages to all partitions including newly created 
partitions (RoundRobin)
         Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         final int totalMessages = newPartitions * 2;
         for (int i = 0; i < totalMessages; i++) {
             String message = "message-" + i;
@@ -261,7 +263,11 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnTopic(String topicName, int messages, int 
startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -324,7 +330,11 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().setPersistence(namespace, new 
PersistencePolicies(3, 3, 3, 50.0));
         assertEquals(admin.namespaces().getPersistence(namespace), new 
PersistencePolicies(3, 3, 3, 50.0));
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -495,7 +505,11 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -708,7 +722,11 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         Set<String> topicNames = Sets.newHashSet();
         for (int i = 0; i < totalTopics; i++) {
             topicNames.add(topicName + i);
-            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName + i).create();
+            Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName + i)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
             producer.close();
         }
 
@@ -735,7 +753,12 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName(subscriberName)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = 
client.newProducer().topic(topic).producerName(producerName).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(topic)
+            .producerName(producerName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         retryStrategically((test) -> {
             PersistentTopicStats stats;
@@ -777,8 +800,11 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
         String topic = "persistent://prop_xyz/use/my-namespace/my-topic";
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopicStats stats = admin.persistentTopics().getStats(topic);
         assertEquals(stats.publishers.size(), 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 9f9ab35..87187c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -530,10 +531,16 @@ public class BatchMessageTest extends BrokerTestBase {
         consumer.close();
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                .batchingMaxPublishDelay(5, 
TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
-                .create();
+            .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+            .batchingMaxMessages(numMsgsInBatch)
+            .enableBatching(true)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         // create producer to publish non batch messages
-        Producer<byte[]> noBatchProducer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> noBatchProducer = 
pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = 
Lists.newArrayList();
         for (int i = 0; i < numMsgs / 2; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 53c49d7..bc5c210 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -159,7 +159,10 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         assertEquals(subRef.getDispatcher().getType(), SubType.Failover);
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
@@ -354,7 +357,8 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
@@ -545,7 +549,10 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
         // enqueue messages
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2bc152a..4b659f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -99,7 +100,10 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
         assertEquals(subRef.getDispatcher().getType(), SubType.Shared);
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs * 2);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs * 2; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
@@ -256,8 +260,11 @@ public class PersistentQueueE2ETest extends BrokerTestBase 
{
                 }).subscribe();
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .maxPendingMessages(numMsgs + 1)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "msg-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c04e3bd..8da33cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -72,6 +73,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
+import org.testng.annotations.Ignore;
 import org.testng.annotations.Test;
 
 /**
@@ -95,7 +97,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         final String topicName = "persistent://prop/ns-abc/topic0";
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -136,7 +142,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         assertEquals(getAvailablePermits(subRef), 1000 /* default */);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs * 2; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -210,7 +220,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .receiverQueueSize(recvQueueSize).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -254,7 +268,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         // (1) Create subscription
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .receiverQueueSize(recvQueueSize).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // (2) Produce Messages
         for (int i = 0; i < recvQueueSize / 2; i++) {
@@ -325,7 +343,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
             });
         }
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < recvQueueSize * numConsumersThreads; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -352,7 +374,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         final String topicName = "persistent://prop/ns-abc/topic4";
         final String subName = "sub4";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
@@ -415,7 +441,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         final String subName = "sub5";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -443,7 +473,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         final String subName = "sub6";
 
         
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
-        pulsarClient.newProducer().topic(topicName).create();
+        pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         try {
             
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
             fail("Should have thrown an exception since one consumer is 
already connected");
@@ -676,7 +710,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         consumer.close();
         assertFalse(subRef.getDispatcher().isConsumerConnected());
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -719,7 +757,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
 
         assertTrue(subRef.getDispatcher().isConsumerConnected());
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -820,7 +862,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
 
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subName).receiverQueueSize(1000).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         assertEquals(consumer.getAvailablePermits(), 0);
 
@@ -847,7 +893,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         final String topicName = "persistent://prop/ns-abc/topic-xyz";
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -896,11 +946,17 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
 
         // 1. Producer connect
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer().topic(topicName)
-                
.maxPendingMessages(messages).blockIfQueueFull(true).sendTimeout(1, 
TimeUnit.SECONDS).create();
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+            .topic(topicName)
+            .maxPendingMessages(messages)
+            .blockIfQueueFull(true)
+            .sendTimeout(1, TimeUnit.SECONDS)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Stop broker
-        cleanup();
+        super.internalCleanup();
 
         // 2. producer publish messages
         long startTime = System.nanoTime();
@@ -937,11 +993,17 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
 
         // 1. Producer connect
         PulsarClient client = 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer().topic(topicName)
-                
.maxPendingMessages(messages).blockIfQueueFull(false).sendTimeout(1, 
TimeUnit.SECONDS).create();
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+            .topic(topicName)
+            .maxPendingMessages(messages)
+            .blockIfQueueFull(false)
+            .sendTimeout(1, TimeUnit.SECONDS)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Stop broker
-        cleanup();
+        super.internalCleanup();
 
         // 2. producer publish messages
         long startTime = System.nanoTime();
@@ -980,9 +1042,16 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         BrokerService brokerService = pulsar.getBrokerService();
 
         // 1. producers connect
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create();
-        /* Producer<byte[]> producer2 = */ 
pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2")
-                .create();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/topic-1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        /* Producer<byte[]> producer2 = */ pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/topic-2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         brokerService.updateRates();
 
@@ -1021,8 +1090,12 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         final String topicName = "persistent://prop/ns-abc/topic0" + 
compressionType;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(compressionType)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
@@ -1057,7 +1130,11 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         statsUpdater.shutdown();
 
         final String namespace = "prop/ns-abc";
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://" + namespace + 
"/topic0").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://" + namespace + "/topic0")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         // 1. producer publish messages
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -1086,7 +1163,10 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         final String topicName = "persistent://prop/ns-abc/topic1";
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         Message<byte[]> msg1 = 
MessageBuilder.create().setContent("message-1".getBytes()).build();
@@ -1141,7 +1221,11 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // (1) Produce messages
         for (int i = 0; i < totalMessages; i++) {
@@ -1196,7 +1280,11 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -1258,8 +1346,11 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
     public void testCreateProducerWithSameName() throws Exception {
         String topic = 
"persistent://prop/ns-abc/testCreateProducerWithSameName";
 
-        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic)
-                .producerName("test-producer-a");
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("test-producer-a")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition);
         Producer<byte[]> p1 = producerBuilder.create();
 
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 50dfc3c..8d2804f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -52,6 +52,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
@@ -209,7 +210,10 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final TopicName topicName = 
TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
                 .build();
-        Producer<byte[]> producer = 
client1.newProducer().topic(topicName.toString()).create();
+        Producer<byte[]> producer = 
client1.newProducer().topic(topicName.toString())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.close();
 
         PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
@@ -850,7 +854,10 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String topicName = 
"persistent://pulsar/ns/checksumAfterReplication";
 
         PulsarClient c1 = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
-        Producer<byte[]> p1 = c1.newProducer().topic(topicName).create();
+        Producer<byte[]> p1 = c1.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PulsarClient c2 = 
PulsarClient.builder().serviceUrl(url2.toString()).build();
         RawReader reader2 = RawReader.create(c2, topicName, "sub").get();
@@ -897,7 +904,10 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         // load namespace with dummy topic on ns
         PulsarClient client = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
-        client.newProducer().topic("persistent://" + namespace + 
"/dummyTopic").create();
+        client.newProducer().topic("persistent://" + namespace + "/dummyTopic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // persistent topic test
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 7670b1b..7d411f5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -267,7 +268,11 @@ public class ReplicatorTestBase {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = 
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
-            producer = client.newProducer().topic(topicName).create();
+            producer = client.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         }
 
@@ -276,12 +281,11 @@ public class ReplicatorTestBase {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = 
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
-            ProducerBuilder<byte[]> producerBuilder = 
client.newProducer().topic(topicName);
-            if (batch) {
-                producerBuilder.enableBatching(true);
-                producerBuilder.batchingMaxPublishDelay(1, TimeUnit.SECONDS);
-                producerBuilder.batchingMaxMessages(5);
-            }
+            ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+                .topic(topicName)
+                .enableBatching(batch)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .batchingMaxMessages(5);
             producer = producerBuilder.create();
 
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index db41fa3..0c28dff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -73,7 +73,10 @@ public class ResendRequestTest extends BrokerTestBase {
         HashSet<String> messageDataHashSet = new HashSet<String>();
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -159,7 +162,10 @@ public class ResendRequestTest extends BrokerTestBase {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
@@ -239,7 +245,10 @@ public class ResendRequestTest extends BrokerTestBase {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
@@ -355,7 +364,10 @@ public class ResendRequestTest extends BrokerTestBase {
         final int totalMessages = 10;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -414,7 +426,9 @@ public class ResendRequestTest extends BrokerTestBase {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -468,7 +482,8 @@ public class ResendRequestTest extends BrokerTestBase {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // 2. Create consumer
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
@@ -564,7 +579,9 @@ public class ResendRequestTest extends BrokerTestBase {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
@@ -645,7 +662,10 @@ public class ResendRequestTest extends BrokerTestBase {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 549d2b7..9bc2730 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
@@ -65,7 +66,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test
     public void testSimpleTermination() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         /* MessageId msgId1 = */producer.send("test-msg-1".getBytes());
         /* MessageId msgId2 = */producer.send("test-msg-2".getBytes());
@@ -84,7 +88,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test
     public void testCreateProducerOnTerminatedTopic() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         /* MessageId msgId1 = */producer.send("test-msg-1".getBytes());
         /* MessageId msgId2 = */producer.send("test-msg-2".getBytes());
@@ -103,7 +110,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testTerminateWhilePublishing() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CyclicBarrier barrier = new CyclicBarrier(2);
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
@@ -148,7 +158,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test
     public void testDoubleTerminate() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         /* MessageId msgId1 = */producer.send("test-msg-1".getBytes());
         /* MessageId msgId2 = */producer.send("test-msg-2".getBytes());
@@ -176,7 +189,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationConsumer() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
@@ -211,7 +227,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationMessageListener() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -248,7 +267,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationReader() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         MessageId msgId1 = producer.send("test-msg-1".getBytes());
         MessageId msgId2 = producer.send("test-msg-2".getBytes());
@@ -277,7 +299,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationReaderListener() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -311,7 +336,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSubscribeOnTerminatedTopic() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         /* MessageId msgId1 = */ producer.send("test-msg-1".getBytes());
         MessageId msgId2 = producer.send("test-msg-2".getBytes());
 
@@ -327,7 +355,10 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSubscribeOnTerminatedTopicWithNoMessages() throws 
Exception {
-        pulsarClient.newProducer().topic(topicName).create();
+        pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         admin.persistentTopics().terminateTopicAsync(topicName).get();
 
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 5fbf1da..5076d9d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -337,7 +337,8 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         /**** broker-2 started ****/
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index e0c46ac..aba0e1b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -600,7 +600,9 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
                 
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, 
TimeUnit.SECONDS).subscribe();
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
-                .create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CountDownLatch latch = new CountDownLatch(totalProducedMsgs);
         // (1) Produced Messages
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 5a8b061..12bfec5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -147,7 +147,10 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1")
                 .subscriptionType(type).subscribe();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         int totalProduceMsg = 500;
         for (int i = 0; i < totalProduceMsg; i++) {
@@ -191,7 +194,10 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName("subscriber-1")
                 .subscriptionType(type).subscribe();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // Ensure all partitions exist
         for (int i = 0; i < numPartitions; i++) {
@@ -493,7 +499,10 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             ConsumerImpl<byte[]> repl3Consumer = (ConsumerImpl<byte[]>) 
client3.newConsumer().topic(globalTopicName)
                     .subscriptionName("subscriber-1").subscribe();
 
-            Producer<byte[]> producer = 
client1.newProducer().topic(globalTopicName).create();
+            Producer<byte[]> producer = 
client1.newProducer().topic(globalTopicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
             Thread.sleep(timeWaitToSync);
 
@@ -778,7 +787,10 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
                     
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
 
-            ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+            ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
             String firstTimeConnected = producer.getConnectedSince();
             ExecutorService executor = Executors.newFixedThreadPool(5);
             byte[] msgData = "testData".getBytes();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 5830265..247a773 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -73,7 +73,8 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), 
numPartitions);
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
@@ -250,7 +251,10 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
             // ok
         }
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.close();
 
         try {
@@ -289,7 +293,10 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         try {
-            producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
+            producer = pulsarClient.newProducer().topic(topicName.toString())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
             consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
             producer.send("message1".getBytes());
             producer.send("message2".getBytes());
@@ -343,7 +350,8 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), 
numPartitions);
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 
.subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe();
@@ -391,7 +399,8 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), 
numPartitions);
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 
.subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 8a536f2..fb2c2b4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -130,10 +130,13 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = consumerBuilder.subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/tp1/my-ns/my-topic2");
+                .topic("persistent://my-property/tp1/my-ns/my-topic2")
+                .messageRoutingMode(MessageRoutingMode.SinglePartition);
         if (batchMessageDelayMs != 0) {
             
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs,
 TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5);
+        } else {
+            producerBuilder.enableBatching(false);
         }
 
         Producer<byte[]> producer = producerBuilder.create();
@@ -185,10 +188,13 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = consumerBuilder.subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/tp1/my-ns/my-topic2");
+                .topic("persistent://my-property/tp1/my-ns/my-topic2")
+                .messageRoutingMode(MessageRoutingMode.SinglePartition);
         if (batchMessageDelayMs != 0) {
             
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs,
 TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5);
+        } else {
+            producerBuilder.enableBatching(false);
         }
 
         Producer<byte[]> producer = producerBuilder.create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b5dcd87..ad406ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -591,16 +591,22 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         final String topic = "persistent://my-property/my-ns/bigMsg";
 
         // (a) non-batch msg with compression
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.LZ4)
+            .create();
         Message<byte[]> message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1])
                 .build();
         producer.send(message);
         producer.close();
 
         // (b) batch-msg
-        producer = 
pulsarClient.newProducer().topic(topic).enableBatching(true).compressionType(CompressionType.LZ4)
-                .create();
+        producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(true)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.LZ4)
+            .create();
         message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
         try {
             producer.send(message);
@@ -611,7 +617,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         producer.close();
 
         // (c) non-batch msg without compression
-        producer = 
pulsarClient.newProducer().topic(topic).compressionType(CompressionType.NONE).create();
+        producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.NONE)
+            .create();
         message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
         try {
             producer.send(message);
@@ -622,7 +632,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         producer.close();
 
         // (d) non-batch msg with compression and try to consume message
-        producer = 
pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).create();
+        producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.LZ4).create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe();
         byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
         message = MessageBuilder.create().setContent(content).build();
@@ -911,7 +925,9 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         final int totalMsg = 100;
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
-                .create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < totalMsg; i++) {
             final String message = "my-message-" + i;
             Message<byte[]> msg = 
MessageBuilder.create().setContent(message.getBytes()).build();
@@ -1294,7 +1310,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                     
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, 
TimeUnit.SECONDS).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    
.topic("persistent://my-property/my-ns/unacked-topic").create();
+                .topic("persistent://my-property/my-ns/unacked-topic")
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1423,6 +1442,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 producerBuidler.enableBatching(true);
                 producerBuidler.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
                 producerBuidler.batchingMaxMessages(5);
+            } else {
+                producerBuidler.enableBatching(false);
             }
 
             Producer<byte[]> producer = producerBuidler.create();
@@ -1868,7 +1889,9 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         Consumer<byte[]> c1 = consumerBuilder.subscribe();
         Consumer<byte[]> c2 = consumerBuilder.subscribe();
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
-                .create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -2274,8 +2297,12 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         // 2. Producer with valid key name
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem")
-                .cryptoKeyReader(new EncKeyReader()).create();
+            .topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+            .addEncryptionKey("client-rsa.pem")
+            .cryptoKeyReader(new EncKeyReader())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 80ac520..36238ba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -122,6 +122,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingEnabled(true);
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
 
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", 
producerConf);
@@ -157,6 +159,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", 
producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -214,6 +218,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3", 
producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -249,6 +255,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4", 
producerConf);
 
@@ -299,6 +307,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingMaxPublishDelay(2 * batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         producerConf.setSendTimeout(1, TimeUnit.SECONDS);
 
@@ -520,6 +530,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic7", 
producerConf);
         for (int i = 0; i < recvQueueSize; i++) {
@@ -627,6 +639,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         // (a) non-batch msg with compression
         ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         producerConf.setCompressionType(CompressionType.LZ4);
         Producer producer = pulsarClient.createProducer(topic, producerConf);
         Message message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
@@ -649,6 +662,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         // (c) non-batch msg without compression
         producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         producerConf.setCompressionType(CompressionType.NONE);
         producer = pulsarClient.createProducer(topic, producerConf);
         message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
@@ -662,6 +676,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         // (d) non-batch msg with compression and try to consume message
         producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         producerConf.setCompressionType(CompressionType.LZ4);
         producer = pulsarClient.createProducer(topic, producerConf);
         Consumer consumer = pulsarClient.subscribe(topic, "sub1");
@@ -704,6 +719,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producerConf.setBatchingEnabled(true);
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
 
         /************ usecase-1: *************/
@@ -959,6 +976,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         final int totalMsg = 100;
         final ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", 
producerConf);
         for (int i = 0; i < totalMsg; i++) {
             final String message = "my-message-" + i;
@@ -1358,6 +1376,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
                     
.subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", 
conf);
 
             ProducerConfiguration producerConf = new ProducerConfiguration();
+            producerConf.setBatchingEnabled(false);
 
             Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
                     producerConf);
@@ -1494,6 +1513,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
                 producerConf.setBatchingEnabled(true);
                 producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
                 producerConf.setBatchingMaxMessages(5);
+            } else {
+                producerConf.setBatchingEnabled(false);
             }
 
             Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
@@ -1961,6 +1982,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         Consumer c2 = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", 
"my-subscriber-name",
                 conf1);
         ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", 
producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
 
@@ -2366,6 +2388,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         final int totalMsg = 10;
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
 
         Message msg = null;
         Set<String> messageSet = Sets.newHashSet();
@@ -2389,6 +2412,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         producerConf = new ProducerConfiguration();
         producerConf.setCryptoKeyReader(new EncKeyReader());
         producerConf.addEncryptionKey("client-rsa.pem");
+        producerConf.setBatchingEnabled(false);
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/myenc-topic1",
                 producerConf);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 6d6fc92..48775e0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -89,7 +90,10 @@ public class MessageIdTest extends BrokerTestBase {
         final int numberOfMessages = 30;
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -181,7 +185,10 @@ public class MessageIdTest extends BrokerTestBase {
         admin.persistentTopics().createPartitionedTopic(topicName, 
numberOfPartitions);
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -230,7 +237,10 @@ public class MessageIdTest extends BrokerTestBase {
         admin.persistentTopics().createPartitionedTopic(topicName, 
numberOfPartitions);
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -279,7 +289,10 @@ public class MessageIdTest extends BrokerTestBase {
         final String topicName = "persistent://prop/use/ns-abc/topic1";
 
         // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         ProducerImpl<byte[]> producer = spy(prod);
         // return higher version compare to broker : so, it forces 
client-producer to remove checksum from payload
         doReturn(producer.brokerChecksumSupportedVersion() + 
1).when(producer).brokerChecksumSupportedVersion();
@@ -344,7 +357,10 @@ public class MessageIdTest extends BrokerTestBase {
         final String topicName = "persistent://prop/use/ns-abc/topic1";
 
         // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         ProducerImpl<byte[]> producer = spy(prod);
         // mock: broker-doesn't support checksum (remote_version < 
brokerChecksumSupportedVersion) so, it forces
         // client-producer to perform checksum-strip from msg at reconnection
@@ -420,8 +436,12 @@ public class MessageIdTest extends BrokerTestBase {
         final String topicName = "persistent://prop/use/ns-abc/retry-topic";
 
         // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
-                .sendTimeout(10, TimeUnit.MINUTES).create();
+        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .sendTimeout(10, TimeUnit.MINUTES)
+            .create();
         ProducerImpl<byte[]> producer = spy(prod);
         Field producerIdField = 
ProducerImpl.class.getDeclaredField("producerId");
         producerIdField.setAccessible(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 2be239f..8e7c71d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -32,6 +32,7 @@ import java.util.stream.IntStream;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -144,11 +145,15 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -301,11 +306,15 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -368,11 +377,15 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -415,6 +428,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         String topicName4 = "persistent://my-property/my-ns/pattern-topic-4-" 
+ key;
         admin.persistentTopics().createPartitionedTopic(topicName4, 4);
         Producer<byte[]> producer4 = 
pulsarClient.newProducer().topic(topicName4)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -471,11 +485,15 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 2651ce8..14ed49a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -61,7 +61,10 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
         final int totalMessages = 15;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -156,7 +159,10 @@ public class PerMessageUnAcknowledgedRedeliveryTest 
extends BrokerTestBase {
         final int totalMessages = 15;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -251,7 +257,10 @@ public class PerMessageUnAcknowledgedRedeliveryTest 
extends BrokerTestBase {
         final int totalMessages = 15;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -356,7 +365,8 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index c321181..c7028a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
@@ -77,7 +78,12 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     private Set<String> publishMessages(String topic, int count) throws 
Exception {
         Set<String> keys = new HashSet<>();
 
-        try (Producer<byte[]> producer = 
pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) {
+        try (Producer<byte[]> producer = pulsarClient.newProducer()
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .maxPendingMessages(count)
+            .topic(topic)
+            .create()) {
             Future<?> lastFuture = null;
             for (int i = 0; i < count; i++) {
                 String key = "key"+i;
@@ -232,8 +238,13 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     public void testBatchingExtractKeysAndIds() throws Exception {
         String topic = "persistent://my-property/my-ns/my-raw-topic";
 
-        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                
.enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create()) {
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()) {
             producer.sendAsync(MessageBuilder.create()
                                
.setKey("key1").setContent("my-content-1".getBytes()).build());
             producer.sendAsync(MessageBuilder.create()
@@ -265,8 +276,13 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     public void testBatchingRebatch() throws Exception {
         String topic = "persistent://my-property/my-ns/my-raw-topic";
 
-        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                
.enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create()) {
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()) {
             producer.sendAsync(MessageBuilder.create()
                                
.setKey("key1").setContent("my-content-1".getBytes()).build());
             producer.sendAsync(MessageBuilder.create()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 50d6e73..8ff745f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -35,6 +35,7 @@ import java.util.stream.IntStream;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -151,11 +152,15 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -212,11 +217,15 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -291,11 +300,15 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -431,11 +444,15 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index e6adf5c..42c957f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -64,7 +64,10 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         final int totalMessages = 10;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -119,7 +122,10 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         final int totalMessages = 10;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -166,7 +172,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -252,7 +260,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -341,7 +351,10 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         final int totalMessages = 3;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index 6cdc506..2f23554 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -94,7 +95,10 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
         final String messagePredicate = "my-message-" + key + "-";
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
@@ -128,7 +132,10 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
         final String messagePredicate = "my-message-" + key + "-";
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         int numOfSubscribers = 4;
@@ -166,7 +173,10 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
         final String messagePredicate = "my-message-" + key + "-";
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
@@ -217,11 +227,14 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
                 .subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://prop-xyz/use/ns-abc/topic1");
+            .topic("persistent://prop-xyz/use/ns-abc/topic1")
+            .messageRoutingMode(MessageRoutingMode.SinglePartition);
 
         if (batchMessageDelayMs != 0) {
             
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs,
 TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5);
+        } else {
+            producerBuilder.enableBatching(false);
         }
 
         Producer<byte[]> producer = producerBuilder.create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 35e7806..43f8bb2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -94,7 +95,11 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         final int numMessages = 20;
         final int maxKeys = 10;
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         Map<String, byte[]> expected = new HashMap<>();
         List<Pair<String, byte[]>> all = new ArrayList<>();
@@ -147,7 +152,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void testReadCompactedBeforeCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -185,7 +193,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void testReadEntriesAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -214,7 +225,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void testSeekEarliestAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
         
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
@@ -253,7 +267,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void testBrokerRestartAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -293,7 +310,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void testCompactEmptyTopic() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -317,7 +337,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producer = pulsarClient.createProducer(topic)) {
+        try (Producer producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create()) {
             producer.sendAsync(MessageBuilder.create()
                                .setKey("key1")
                                .setContent("my-message-1".getBytes()).build());
@@ -365,9 +385,14 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                .enableBatching(true).batchingMaxMessages(3)
-                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()
+        ) {
             producer.sendAsync(MessageBuilder.create()
                                .setKey("key1")
                                .setContent("my-message-1".getBytes()).build());
@@ -425,10 +450,17 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
-             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                .enableBatching(true).batchingMaxMessages(3)
-                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+        try (Producer producerNormal = pulsarClient.newProducer().topic(topic)
+                 .enableBatching(false)
+                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                 .create();
+             Producer producerBatch = pulsarClient.newProducer().topic(topic)
+                 .maxPendingMessages(3)
+                 .enableBatching(true)
+                 .batchingMaxMessages(3)
+                 .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                 .create()) {
             producerBatch.sendAsync(MessageBuilder.create()
                                     .setKey("key1")
                                     
.setContent("my-message-1".getBytes()).build());
@@ -533,10 +565,17 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
-             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                .enableBatching(true).batchingMaxMessages(3)
-                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+        try (Producer producerNormal = pulsarClient.newProducer()
+                 .topic(topic)
+                 .enableBatching(false)
+                 .create();
+             Producer producerBatch = pulsarClient.newProducer()
+                 .topic(topic)
+                 .maxPendingMessages(3)
+                 .enableBatching(true)
+                 .batchingMaxMessages(3)
+                 .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                 .create()) {
 
             // key0 persists through it all
             producerNormal.sendAsync(MessageBuilder.create()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 37446ad..7bd5adc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -117,7 +118,10 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         final int numMessages = 1000;
         final int maxKeys = 10;
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         Map<String, byte[]> expected = new HashMap<>();
         Random r = new Random(0);
@@ -138,7 +142,10 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     public void testCompactAddCompact() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         Map<String, byte[]> expected = new HashMap<>();
 
@@ -168,7 +175,10 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     public void testCompactedInOrder() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         producer.send(MessageBuilder.create()
                       .setKey("c")
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 03d82a7..a9723a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -115,7 +115,7 @@ public interface ProducerBuilder<T> extends Serializable, 
Cloneable {
      * Set the max size of the queue holding the messages pending to receive 
an acknowledgment from the broker.
      * <p>
      * When the queue is full, by default, all calls to {@link Producer#send} 
and {@link Producer#sendAsync} will fail
-     * unless blockIfQueueFull is set to true. Use {@link 
#setBlockIfQueueFull} to change the blocking behavior.
+     * unless blockIfQueueFull is set to true. Use {@link 
#blockIfQueueFull(boolean)} to change the blocking behavior.
      *
      * @param maxPendingMessages
      * @return
@@ -148,13 +148,15 @@ public interface ProducerBuilder<T> extends Serializable, 
Cloneable {
     /**
      * Set the message routing mode for the partitioned producer.
      *
-     * Default routing mode for messages to partition.
+     * Default routing mode is round-robin routing.
      *
      * This logic is applied when the application is not setting a key {@link 
MessageBuilder#setKey(String)} on a
      * particular message.
      *
      * @param messageRoutingMode
      *            the message routing mode
+     * @return producer builder
+     * @see MessageRoutingMode
      */
     ProducerBuilder<T> messageRoutingMode(MessageRoutingMode 
messageRoutingMode);
 
@@ -205,9 +207,13 @@ public interface ProducerBuilder<T> extends Serializable, 
Cloneable {
      * messages will be compressed at the batch level, leading to a much 
better compression ratio for similar headers or
      * contents.
      *
-     * When enabled default batch delay is set to 10 ms and default batch size 
is 1000 messages
+     * When enabled default batch delay is set to 1 ms and default batch size 
is 1000 messages
      *
+     * <p>Batching is enabled by default since 2.0.0.
+     *
+     * @return producer builder.
      * @see #batchingMaxPublishDelay(long, TimeUnit)
+     * @see #batchingMaxMessages(int)
      */
     ProducerBuilder<T> enableBatching(boolean enableBatching);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 49cd1d8..1af077b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -184,10 +184,11 @@ public class ProducerConfiguration implements 
Serializable {
     }
 
     /**
-     * Set the message routing mode for the partitioned producer
+     * Set the message routing mode for the partitioned producer.
      *
-     * @param mode
-     * @return
+     * @param messageRouteMode message routing mode.
+     * @return producer configuration
+     * @see MessageRoutingMode
      */
     public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode 
messageRouteMode) {
         checkNotNull(messageRouteMode);
@@ -197,9 +198,10 @@ public class ProducerConfiguration implements Serializable 
{
     }
 
     /**
-     * Get the message routing mode for the partitioned producer
+     * Get the message routing mode for the partitioned producer.
      *
-     * @return
+     * @return message routing mode, default is round-robin routing.
+     * @see MessageRoutingMode#RoundRobinPartition
      */
     public MessageRoutingMode getMessageRoutingMode() {
         return 
MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
@@ -269,9 +271,12 @@ public class ProducerConfiguration implements Serializable 
{
     }
 
     /**
-     * @ return if batch messages are enabled
+     * Return the flag whether automatic message batching is enabled or not.
+     *
+     * @return true if batch messages are enabled. otherwise false.
+     * @since 2.0.0 <br>
+     *        It is enabled by default.
      */
-
     public boolean getBatchingEnabled() {
         return conf.isBatchingEnabled();
     }
@@ -290,8 +295,8 @@ public class ProducerConfiguration implements Serializable {
      * @since 1.0.36 <br>
      *        Make sure all the consumer applications have been updated to use 
this client version, before starting to
      *        batch messages.
+     *
      */
-
     public ProducerConfiguration setBatchingEnabled(boolean 
batchMessagesEnabled) {
         conf.setBatchingEnabled(batchMessagesEnabled);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 1449a45..9f06809 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -50,7 +50,7 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
     private boolean blockIfQueueFull = false;
     private int maxPendingMessages = 1000;
     private int maxPendingMessagesAcrossPartitions = 50000;
-    private MessageRoutingMode messageRoutingMode = 
MessageRoutingMode.SinglePartition;
+    private MessageRoutingMode messageRoutingMode = 
MessageRoutingMode.RoundRobinPartition;
     private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
 
     private ProducerCryptoFailureAction cryptoFailureAction = 
ProducerCryptoFailureAction.FAIL;
@@ -58,9 +58,9 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
     @JsonIgnore
     private MessageRouter customMessageRouter = null;
 
-    private long batchingMaxPublishDelayMicros = 
TimeUnit.MILLISECONDS.toMicros(10);
+    private long batchingMaxPublishDelayMicros = 
TimeUnit.MILLISECONDS.toMicros(1);
     private int batchingMaxMessages = 1000;
-    private boolean batchingEnabled = false; // disabled by default
+    private boolean batchingEnabled = true; // enabled by default
 
     @JsonIgnore
     private CryptoKeyReader cryptoKeyReader;
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index ee9d048..651056b 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -85,8 +85,11 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
     public void testProducerConsumer() throws Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://sample/test/local/producer-consumer-topic")
-                .create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://sample/test/local/producer-consumer-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // Create a consumer directly attached to broker
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -116,8 +119,10 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
                 .build();
         
admin.persistentTopics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic",
 2);
 
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://sample/test/local/partitioned-topic")
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://sample/test/local/partitioned-topic")
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // Create a consumer directly attached to broker
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index c2c3ee9..5ed8adf 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -251,6 +251,9 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
     private ProducerConfiguration getProducerConfiguration() {
         ProducerConfiguration conf = new ProducerConfiguration();
 
+        conf.setBatchingEnabled(false);
+        conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
+
         // Set to false to prevent the server thread from being blocked if a 
lot of messages are pending.
         conf.setBlockIfQueueFull(false);
 
diff --git 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index 43c3866..3611736 100644
--- 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++ 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.tests.DockerUtils;
@@ -65,7 +66,11 @@ public class TestCompaction extends Arquillian {
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
 
-            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
+            try(Producer<byte[]> producer = client.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create()) {
                 
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
                 
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
             }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to