This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8fbb926 Update default values for a few publisher settings (#1440) 8fbb926 is described below commit 8fbb926089effcadf59e199fe69ec65b4bbe4eae Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Sun Apr 15 21:29:23 2018 -0700 Update default values for a few publisher settings (#1440) --- .../pulsar/broker/service/AbstractReplicator.java | 6 +- .../apache/pulsar/broker/admin/AdminApiTest.java | 74 ++++++++-- .../apache/pulsar/broker/admin/AdminApiTest2.java | 34 ++++- .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 71 +++++++--- .../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 42 ++++-- .../pulsar/broker/service/BatchMessageTest.java | 13 +- .../broker/service/PersistentFailoverE2ETest.java | 13 +- .../broker/service/PersistentQueueE2ETest.java | 13 +- .../broker/service/PersistentTopicE2ETest.java | 149 +++++++++++++++++---- .../pulsar/broker/service/ReplicatorTest.java | 16 ++- .../pulsar/broker/service/ReplicatorTestBase.java | 18 ++- .../pulsar/broker/service/ResendRequestTest.java | 36 +++-- .../broker/service/TopicTerminationTest.java | 51 +++++-- .../pulsar/client/api/BrokerServiceLookupTest.java | 3 +- .../client/api/DispatcherBlockConsumerTest.java | 4 +- .../pulsar/client/api/NonPersistentTopicTest.java | 20 ++- .../api/PartitionedProducerConsumerTest.java | 19 ++- .../client/api/SimpleProducerConsumerStatTest.java | 10 +- .../client/api/SimpleProducerConsumerTest.java | 49 +++++-- .../client/api/v1/V1_ProducerConsumerTest.java | 24 ++++ .../apache/pulsar/client/impl/MessageIdTest.java | 34 ++++- .../client/impl/PatternTopicsConsumerImplTest.java | 18 +++ .../PerMessageUnAcknowledgedRedeliveryTest.java | 18 ++- .../apache/pulsar/client/impl/RawReaderTest.java | 26 +++- .../pulsar/client/impl/TopicsConsumerImplTest.java | 17 +++ .../impl/UnAcknowledgedMessagesTimeoutTest.java | 23 +++- .../pulsar/client/impl/ZeroQueueSizeTest.java | 21 ++- .../apache/pulsar/compaction/CompactionTest.java | 75 ++++++++--- .../apache/pulsar/compaction/CompactorTest.java | 16 ++- .../apache/pulsar/client/api/ProducerBuilder.java | 12 +- .../pulsar/client/api/ProducerConfiguration.java | 21 +-- .../impl/conf/ProducerConfigurationData.java | 6 +- .../org/apache/pulsar/proxy/server/ProxyTest.java | 13 +- .../apache/pulsar/websocket/ProducerHandler.java | 3 + .../pulsar/tests/integration/TestCompaction.java | 7 +- 35 files changed, 774 insertions(+), 201 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index d3d15d9..ff85e34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; @@ -74,7 +75,10 @@ public abstract class AbstractReplicator { this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.producerBuilder = client.newProducer() // - .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) // + .topic(topicName) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .enableBatching(false) + .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 4900b5b..7c24692 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -640,7 +640,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0)); // Force topic creation and namespace being loaded - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/my-topic").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1/my-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); admin.persistentTopics().delete("persistent://prop-xyz/ns1/my-topic"); @@ -804,8 +808,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub-1"); assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub")); - Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; @@ -859,7 +866,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { } catch (ConflictException ce) { } - producer = client.newProducer().topic(partitionedTopicName).create(); + producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); topics = admin.persistentTopics().getList("prop-xyz/ns1"); assertEquals(topics.size(), 4); @@ -918,7 +929,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // Force to create a topic final String namespace = "prop-xyz/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -944,7 +959,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // Force to create a topic final String namespace = "prop-xyz/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -1037,7 +1056,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1095,7 +1118,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1148,7 +1175,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { .subscribe(); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1157,7 +1188,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { producer.close(); // Create producer - Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds1").create(); + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1-bundles/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer1.send(message.getBytes()); @@ -1251,7 +1286,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -1296,7 +1335,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testDeleteFailedReturnCode() throws Exception { String topicName = "persistent://prop-xyz/ns1/my-topic"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); try { admin.persistentTopics().delete(topicName); @@ -1684,8 +1727,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1") .subscriptionName("my-sub").subscribe(); - Producer<byte[]> producer = client.newProducer().topic("persistent://prop-xyz/ns1/ds1") - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://prop-xyz/ns1/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index e68a5c5..ad95065 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -173,8 +173,11 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { .toString(); // (3) produce messages to all partitions including newly created partitions (RoundRobin) - Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); final int totalMessages = newPartitions * 2; for (int i = 0; i < totalMessages; i++) { String message = "message-" + i; @@ -268,7 +271,11 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { } private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -331,7 +338,11 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0)); assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0)); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); @@ -381,7 +392,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // recreation of producer will load the topic again - producer = pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newProducer().topic(topicName).create(); topic = pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topic); // unload the topic @@ -502,7 +513,11 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -741,7 +756,12 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName) .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = client.newProducer().topic(topic).producerName(producerName).create(); + Producer<byte[]> producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .producerName(producerName) + .create(); retryStrategically((test) -> { PersistentTopicStats stats; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 42905c7..6895036 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -805,8 +805,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub-1"); assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub")); - Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; @@ -860,7 +863,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { } catch (ConflictException ce) { } - producer = client.newProducer().topic(partitionedTopicName).create(); + producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); topics = admin.persistentTopics().getList("prop-xyz/use/ns1"); assertEquals(topics.size(), 4); @@ -918,7 +925,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { // Force to create a topic final String namespace = "prop-xyz/use/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -944,7 +955,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { // Force to create a topic final String namespace = "prop-xyz/use/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -1064,7 +1079,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1121,8 +1140,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2") - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1174,8 +1196,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { .subscribe(); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2") - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1184,8 +1209,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { producer.close(); // Create producer - Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds1") - .create(); + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1-bundles/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer1.send(message.getBytes()); @@ -1278,7 +1306,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -1323,7 +1355,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testDeleteFailedReturnCode() throws Exception { String topicName = "persistent://prop-xyz/use/ns1/my-topic"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); try { admin.persistentTopics().delete(topicName); @@ -1711,8 +1747,11 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1") .subscriptionName("my-sub").subscribe(); - Producer<byte[]> producer = client.newProducer().topic("persistent://prop-xyz/use/ns1/ds1") - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://prop-xyz/use/ns1/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java index 6ac2b39..3551141 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java @@ -167,7 +167,9 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { // (3) produce messages to all partitions including newly created partitions (RoundRobin) Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); final int totalMessages = newPartitions * 2; for (int i = 0; i < totalMessages; i++) { String message = "message-" + i; @@ -261,7 +263,11 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { } private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -324,7 +330,11 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0)); assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0)); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); @@ -495,7 +505,11 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -708,7 +722,11 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { Set<String> topicNames = Sets.newHashSet(); for (int i = 0; i < totalTopics; i++) { topicNames.add(topicName + i); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName + i) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); } @@ -735,7 +753,12 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName) .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = client.newProducer().topic(topic).producerName(producerName).create(); + Producer<byte[]> producer = client.newProducer() + .topic(topic) + .producerName(producerName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); retryStrategically((test) -> { PersistentTopicStats stats; @@ -777,8 +800,11 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { String topic = "persistent://prop_xyz/use/my-namespace/my-topic"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopicStats stats = admin.persistentTopics().getStats(topic); assertEquals(stats.publishers.size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 9f9ab35..87187c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.util.FutureUtil; @@ -530,10 +531,16 @@ public class BatchMessageTest extends BrokerTestBase { consumer.close(); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true) - .create(); + .batchingMaxPublishDelay(5, TimeUnit.SECONDS) + .batchingMaxMessages(numMsgsInBatch) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // create producer to publish non batch messages - Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs / 2; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 53c49d7..bc5c210 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -159,7 +159,10 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { assertEquals(subRef.getDispatcher().getType(), SubType.Failover); List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); @@ -354,7 +357,8 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); @@ -545,7 +549,10 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { // enqueue messages List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 2bc152a..4b659f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -99,7 +100,10 @@ public class PersistentQueueE2ETest extends BrokerTestBase { assertEquals(subRef.getDispatcher().getType(), SubType.Shared); List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs * 2); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs * 2; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); @@ -256,8 +260,11 @@ public class PersistentQueueE2ETest extends BrokerTestBase { }).subscribe(); List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .maxPendingMessages(numMsgs + 1) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "msg-" + i; futures.add(producer.sendAsync(message.getBytes())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index c04e3bd..8da33cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -72,6 +73,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; +import org.testng.annotations.Ignore; import org.testng.annotations.Test; /** @@ -95,7 +97,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String topicName = "persistent://prop/ns-abc/topic0"; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -136,7 +142,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); assertEquals(getAvailablePermits(subRef), 1000 /* default */); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs * 2; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -210,7 +220,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .receiverQueueSize(recvQueueSize).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -254,7 +268,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { // (1) Create subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .receiverQueueSize(recvQueueSize).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (2) Produce Messages for (int i = 0; i < recvQueueSize / 2; i++) { @@ -325,7 +343,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { }); } - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < recvQueueSize * numConsumersThreads; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -352,7 +374,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String topicName = "persistent://prop/ns-abc/topic4"; final String subName = "sub4"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -415,7 +441,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String subName = "sub5"; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -443,7 +473,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String subName = "sub6"; pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); try { pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); fail("Should have thrown an exception since one consumer is already connected"); @@ -676,7 +710,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { consumer.close(); assertFalse(subRef.getDispatcher().isConsumerConnected()); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -719,7 +757,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { assertTrue(subRef.getDispatcher().isConsumerConnected()); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -820,7 +862,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subName).receiverQueueSize(1000).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); assertEquals(consumer.getAvailablePermits(), 0); @@ -847,7 +893,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String topicName = "persistent://prop/ns-abc/topic-xyz"; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -896,11 +946,17 @@ public class PersistentTopicE2ETest extends BrokerTestBase { PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build(); // 1. Producer connect - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topicName) - .maxPendingMessages(messages).blockIfQueueFull(true).sendTimeout(1, TimeUnit.SECONDS).create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer() + .topic(topicName) + .maxPendingMessages(messages) + .blockIfQueueFull(true) + .sendTimeout(1, TimeUnit.SECONDS) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Stop broker - cleanup(); + super.internalCleanup(); // 2. producer publish messages long startTime = System.nanoTime(); @@ -937,11 +993,17 @@ public class PersistentTopicE2ETest extends BrokerTestBase { // 1. Producer connect PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build(); - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topicName) - .maxPendingMessages(messages).blockIfQueueFull(false).sendTimeout(1, TimeUnit.SECONDS).create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer() + .topic(topicName) + .maxPendingMessages(messages) + .blockIfQueueFull(false) + .sendTimeout(1, TimeUnit.SECONDS) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Stop broker - cleanup(); + super.internalCleanup(); // 2. producer publish messages long startTime = System.nanoTime(); @@ -980,9 +1042,16 @@ public class PersistentTopicE2ETest extends BrokerTestBase { BrokerService brokerService = pulsar.getBrokerService(); // 1. producers connect - Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create(); - /* Producer<byte[]> producer2 = */ pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2") - .create(); + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic("persistent://prop/ns-abc/topic-1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + /* Producer<byte[]> producer2 = */ pulsarClient.newProducer() + .topic("persistent://prop/ns-abc/topic-2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); brokerService.updateRates(); @@ -1021,8 +1090,12 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String topicName = "persistent://prop/ns-abc/topic0" + compressionType; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(compressionType) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -1057,7 +1130,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { statsUpdater.shutdown(); final String namespace = "prop/ns-abc"; - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/topic0").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://" + namespace + "/topic0") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 1. producer publish messages for (int i = 0; i < 10; i++) { String message = "my-message-" + i; @@ -1086,7 +1163,10 @@ public class PersistentTopicE2ETest extends BrokerTestBase { final String topicName = "persistent://prop/ns-abc/topic1"; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); Message<byte[]> msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build(); @@ -1141,7 +1221,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (1) Produce messages for (int i = 0; i < totalMessages; i++) { @@ -1196,7 +1280,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -1258,8 +1346,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase { public void testCreateProducerWithSameName() throws Exception { String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName"; - ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic) - .producerName("test-producer-a"); + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() + .topic(topic) + .producerName("test-producer-a") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition); Producer<byte[]> p1 = producerBuilder.create(); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 50dfc3c..8d2804f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; @@ -209,7 +210,10 @@ public class ReplicatorTest extends ReplicatorTestBase { final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0)); PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); - Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()).create(); + Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get(); @@ -850,7 +854,10 @@ public class ReplicatorTest extends ReplicatorTestBase { final String topicName = "persistent://pulsar/ns/checksumAfterReplication"; PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); - Producer<byte[]> p1 = c1.newProducer().topic(topicName).create(); + Producer<byte[]> p1 = c1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PulsarClient c2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); RawReader reader2 = RawReader.create(c2, topicName, "sub").get(); @@ -897,7 +904,10 @@ public class ReplicatorTest extends ReplicatorTestBase { // load namespace with dummy topic on ns PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString()).build(); - client.newProducer().topic("persistent://" + namespace + "/dummyTopic").create(); + client.newProducer().topic("persistent://" + namespace + "/dummyTopic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // persistent topic test try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 7670b1b..7d411f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -267,7 +268,11 @@ public class ReplicatorTestBase { this.namespace = dest.getNamespace(); this.topicName = dest.toString(); client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); - producer = client.newProducer().topic(topicName).create(); + producer = client.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); } @@ -276,12 +281,11 @@ public class ReplicatorTestBase { this.namespace = dest.getNamespace(); this.topicName = dest.toString(); client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); - ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topicName); - if (batch) { - producerBuilder.enableBatching(true); - producerBuilder.batchingMaxPublishDelay(1, TimeUnit.SECONDS); - producerBuilder.batchingMaxMessages(5); - } + ProducerBuilder<byte[]> producerBuilder = client.newProducer() + .topic(topicName) + .enableBatching(batch) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .batchingMaxMessages(5); producer = producerBuilder.create(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index db41fa3..0c28dff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -73,7 +73,10 @@ public class ResendRequestTest extends BrokerTestBase { HashSet<String> messageDataHashSet = new HashSet<String>(); // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -159,7 +162,10 @@ public class ResendRequestTest extends BrokerTestBase { final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -239,7 +245,10 @@ public class ResendRequestTest extends BrokerTestBase { final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -355,7 +364,10 @@ public class ResendRequestTest extends BrokerTestBase { final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -414,7 +426,9 @@ public class ResendRequestTest extends BrokerTestBase { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -468,7 +482,8 @@ public class ResendRequestTest extends BrokerTestBase { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // 2. Create consumer ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) @@ -564,7 +579,9 @@ public class ResendRequestTest extends BrokerTestBase { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) @@ -645,7 +662,10 @@ public class ResendRequestTest extends BrokerTestBase { final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java index 549d2b7..9bc2730 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; @@ -65,7 +66,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test public void testSimpleTermination() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */producer.send("test-msg-1".getBytes()); /* MessageId msgId2 = */producer.send("test-msg-2".getBytes()); @@ -84,7 +88,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test public void testCreateProducerOnTerminatedTopic() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */producer.send("test-msg-1".getBytes()); /* MessageId msgId2 = */producer.send("test-msg-2".getBytes()); @@ -103,7 +110,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testTerminateWhilePublishing() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CyclicBarrier barrier = new CyclicBarrier(2); List<CompletableFuture<MessageId>> futures = new ArrayList<>(); @@ -148,7 +158,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test public void testDoubleTerminate() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */producer.send("test-msg-1".getBytes()); /* MessageId msgId2 = */producer.send("test-msg-2".getBytes()); @@ -176,7 +189,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testSimpleTerminationConsumer() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").subscribe(); @@ -211,7 +227,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testSimpleTerminationMessageListener() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CountDownLatch latch = new CountDownLatch(1); @@ -248,7 +267,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testSimpleTerminationReader() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); MessageId msgId1 = producer.send("test-msg-1".getBytes()); MessageId msgId2 = producer.send("test-msg-2".getBytes()); @@ -277,7 +299,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testSimpleTerminationReaderListener() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CountDownLatch latch = new CountDownLatch(1); @@ -311,7 +336,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testSubscribeOnTerminatedTopic() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */ producer.send("test-msg-1".getBytes()); MessageId msgId2 = producer.send("test-msg-2".getBytes()); @@ -327,7 +355,10 @@ public class TopicTerminationTest extends BrokerTestBase { @Test(timeOut = 20000) public void testSubscribeOnTerminatedTopicWithNoMessages() throws Exception { - pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); admin.persistentTopics().terminateTopicAsync(topicName).get(); org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 5fbf1da..5076d9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -337,7 +337,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { /**** broker-2 started ****/ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index e0c46ac..aba0e1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -600,7 +600,9 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CountDownLatch latch = new CountDownLatch(totalProducedMsgs); // (1) Produced Messages diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 5a8b061..12bfec5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -147,7 +147,10 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1") .subscriptionType(type).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); int totalProduceMsg = 500; for (int i = 0; i < totalProduceMsg; i++) { @@ -191,7 +194,10 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1") .subscriptionType(type).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // Ensure all partitions exist for (int i = 0; i < numPartitions; i++) { @@ -493,7 +499,10 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { ConsumerImpl<byte[]> repl3Consumer = (ConsumerImpl<byte[]>) client3.newConsumer().topic(globalTopicName) .subscriptionName("subscriber-1").subscribe(); - Producer<byte[]> producer = client1.newProducer().topic(globalTopicName).create(); + Producer<byte[]> producer = client1.newProducer().topic(globalTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Thread.sleep(timeWaitToSync); @@ -778,7 +787,10 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2") .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe(); - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); String firstTimeConnected = producer.getConnectedSince(); ExecutorService executor = Executors.newFixedThreadPool(5); byte[] msgData = "testData".getBytes(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index 5830265..247a773 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -73,7 +73,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscribe(); @@ -250,7 +251,10 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { // ok } - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); try { @@ -289,7 +293,10 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { } try { - producer = pulsarClient.newProducer().topic(topicName.toString()).create(); + producer = pulsarClient.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe(); producer.send("message1".getBytes()); producer.send("message2".getBytes()); @@ -343,7 +350,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe(); @@ -391,7 +399,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 8a536f2..fb2c2b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -130,10 +130,13 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { Consumer<byte[]> consumer = consumerBuilder.subscribe(); ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() - .topic("persistent://my-property/tp1/my-ns/my-topic2"); + .topic("persistent://my-property/tp1/my-ns/my-topic2") + .messageRoutingMode(MessageRoutingMode.SinglePartition); if (batchMessageDelayMs != 0) { producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) .batchingMaxMessages(5); + } else { + producerBuilder.enableBatching(false); } Producer<byte[]> producer = producerBuilder.create(); @@ -185,10 +188,13 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { Consumer<byte[]> consumer = consumerBuilder.subscribe(); ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() - .topic("persistent://my-property/tp1/my-ns/my-topic2"); + .topic("persistent://my-property/tp1/my-ns/my-topic2") + .messageRoutingMode(MessageRoutingMode.SinglePartition); if (batchMessageDelayMs != 0) { producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) .batchingMaxMessages(5); + } else { + producerBuilder.enableBatching(false); } Producer<byte[]> producer = producerBuilder.create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index b5dcd87..ad406ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -591,16 +591,22 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { final String topic = "persistent://my-property/my-ns/bigMsg"; // (a) non-batch msg with compression - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.LZ4) + .create(); Message<byte[]> message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]) .build(); producer.send(message); producer.close(); // (b) batch-msg - producer = pulsarClient.newProducer().topic(topic).enableBatching(true).compressionType(CompressionType.LZ4) - .create(); + producer = pulsarClient.newProducer().topic(topic) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.LZ4) + .create(); message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); try { producer.send(message); @@ -611,7 +617,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producer.close(); // (c) non-batch msg without compression - producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.NONE).create(); + producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.NONE) + .create(); message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); try { producer.send(message); @@ -622,7 +632,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producer.close(); // (d) non-batch msg with compression and try to consume message - producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).create(); + producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.LZ4).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe(); byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10]; message = MessageBuilder.create().setContent(content).build(); @@ -911,7 +925,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { final int totalMsg = 100; Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1") - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < totalMsg; i++) { final String message = "my-message-" + i; Message<byte[]> msg = MessageBuilder.create().setContent(message.getBytes()).build(); @@ -1294,7 +1310,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer() - .topic("persistent://my-property/my-ns/unacked-topic").create(); + .topic("persistent://my-property/my-ns/unacked-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (1) Produced Messages for (int i = 0; i < totalProducedMsgs; i++) { @@ -1423,6 +1442,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producerBuidler.enableBatching(true); producerBuidler.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerBuidler.batchingMaxMessages(5); + } else { + producerBuidler.enableBatching(false); } Producer<byte[]> producer = producerBuidler.create(); @@ -1868,7 +1889,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { Consumer<byte[]> c1 = consumerBuilder.subscribe(); Consumer<byte[]> c2 = consumerBuilder.subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2") - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); List<Future<MessageId>> futures = Lists.newArrayList(); // Asynchronously produce messages @@ -2274,8 +2297,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // 2. Producer with valid key name Producer<byte[]> producer = pulsarClient.newProducer() - .topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem") - .cryptoKeyReader(new EncKeyReader()).create(); + .topic("persistent://my-property/use/myenc-ns/myenc-topic1") + .addEncryptionKey("client-rsa.pem") + .cryptoKeyReader(new EncKeyReader()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < totalMsg; i++) { String message = "my-message-" + i; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 80ac520..36238ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -122,6 +122,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); @@ -157,6 +159,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf); List<Future<MessageId>> futures = Lists.newArrayList(); @@ -214,6 +218,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3", producerConf); List<Future<MessageId>> futures = Lists.newArrayList(); @@ -249,6 +255,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4", producerConf); @@ -299,6 +307,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingMaxPublishDelay(2 * batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } producerConf.setSendTimeout(1, TimeUnit.SECONDS); @@ -520,6 +530,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic7", producerConf); for (int i = 0; i < recvQueueSize; i++) { @@ -627,6 +639,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { // (a) non-batch msg with compression ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); producerConf.setCompressionType(CompressionType.LZ4); Producer producer = pulsarClient.createProducer(topic, producerConf); Message message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); @@ -649,6 +662,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { // (c) non-batch msg without compression producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); producerConf.setCompressionType(CompressionType.NONE); producer = pulsarClient.createProducer(topic, producerConf); message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); @@ -662,6 +676,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { // (d) non-batch msg with compression and try to consume message producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); producerConf.setCompressionType(CompressionType.LZ4); producer = pulsarClient.createProducer(topic, producerConf); Consumer consumer = pulsarClient.subscribe(topic, "sub1"); @@ -704,6 +719,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); + } else { + producerConf.setBatchingEnabled(false); } /************ usecase-1: *************/ @@ -959,6 +976,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { final int totalMsg = 100; final ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); for (int i = 0; i < totalMsg; i++) { final String message = "my-message-" + i; @@ -1358,6 +1376,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf); ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", producerConf); @@ -1494,6 +1513,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", @@ -1961,6 +1982,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { Consumer c2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name", conf1); ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf); List<Future<MessageId>> futures = Lists.newArrayList(); @@ -2366,6 +2388,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { final int totalMsg = 10; ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Message msg = null; Set<String> messageSet = Sets.newHashSet(); @@ -2389,6 +2412,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { producerConf = new ProducerConfiguration(); producerConf.setCryptoKeyReader(new EncKeyReader()); producerConf.addEncryptionKey("client-rsa.pem"); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myenc-topic1", producerConf); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index 6d6fc92..48775e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; @@ -89,7 +90,10 @@ public class MessageIdTest extends BrokerTestBase { final int numberOfMessages = 30; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -181,7 +185,10 @@ public class MessageIdTest extends BrokerTestBase { admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions); // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -230,7 +237,10 @@ public class MessageIdTest extends BrokerTestBase { admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions); // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -279,7 +289,10 @@ public class MessageIdTest extends BrokerTestBase { final String topicName = "persistent://prop/use/ns-abc/topic1"; // 1. producer connect - ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); ProducerImpl<byte[]> producer = spy(prod); // return higher version compare to broker : so, it forces client-producer to remove checksum from payload doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion(); @@ -344,7 +357,10 @@ public class MessageIdTest extends BrokerTestBase { final String topicName = "persistent://prop/use/ns-abc/topic1"; // 1. producer connect - ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); ProducerImpl<byte[]> producer = spy(prod); // mock: broker-doesn't support checksum (remote_version < brokerChecksumSupportedVersion) so, it forces // client-producer to perform checksum-strip from msg at reconnection @@ -420,8 +436,12 @@ public class MessageIdTest extends BrokerTestBase { final String topicName = "persistent://prop/use/ns-abc/retry-topic"; // 1. producer connect - ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) - .sendTimeout(10, TimeUnit.MINUTES).create(); + ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .sendTimeout(10, TimeUnit.MINUTES) + .create(); ProducerImpl<byte[]> producer = spy(prod); Field producerIdField = ProducerImpl.class.getDeclaredField("producerId"); producerIdField.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 2be239f..8e7c71d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -32,6 +32,7 @@ import java.util.stream.IntStream; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -144,11 +145,15 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -301,11 +306,15 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -368,11 +377,15 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -415,6 +428,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { String topicName4 = "persistent://my-property/my-ns/pattern-topic-4-" + key; admin.persistentTopics().createPartitionedTopic(topicName4, 4); Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -471,11 +485,15 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index 2651ce8..14ed49a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -61,7 +61,10 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { final int totalMessages = 15; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -156,7 +159,10 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { final int totalMessages = 15; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -251,7 +257,10 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { final int totalMessages = 15; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -356,7 +365,8 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index c321181..c7028a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; @@ -77,7 +78,12 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { private Set<String> publishMessages(String topic, int count) throws Exception { Set<String> keys = new HashSet<>(); - try (Producer<byte[]> producer = pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) { + try (Producer<byte[]> producer = pulsarClient.newProducer() + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .maxPendingMessages(count) + .topic(topic) + .create()) { Future<?> lastFuture = null; for (int i = 0; i < count; i++) { String key = "key"+i; @@ -232,8 +238,13 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { public void testBatchingExtractKeysAndIds() throws Exception { String topic = "persistent://my-property/my-ns/my-raw-topic"; - try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producer.sendAsync(MessageBuilder.create() .setKey("key1").setContent("my-content-1".getBytes()).build()); producer.sendAsync(MessageBuilder.create() @@ -265,8 +276,13 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { public void testBatchingRebatch() throws Exception { String topic = "persistent://my-property/my-ns/my-raw-topic"; - try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producer.sendAsync(MessageBuilder.create() .setKey("key1").setContent("my-content-1".getBytes()).build()); producer.sendAsync(MessageBuilder.create() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 50d6e73..8ff745f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -35,6 +35,7 @@ import java.util.stream.IntStream; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -151,11 +152,15 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -212,11 +217,15 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -291,11 +300,15 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -431,11 +444,15 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index e6adf5c..42c957f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -64,7 +64,10 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -119,7 +122,10 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -166,7 +172,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -252,7 +260,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -341,7 +351,10 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { final int totalMessages = 3; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index 6cdc506..2f23554 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; @@ -94,7 +95,10 @@ public class ZeroQueueSizeTest extends BrokerTestBase { final String messagePredicate = "my-message-" + key + "-"; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) @@ -128,7 +132,10 @@ public class ZeroQueueSizeTest extends BrokerTestBase { final String messagePredicate = "my-message-" + key + "-"; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer int numOfSubscribers = 4; @@ -166,7 +173,10 @@ public class ZeroQueueSizeTest extends BrokerTestBase { final String messagePredicate = "my-message-" + key + "-"; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) @@ -217,11 +227,14 @@ public class ZeroQueueSizeTest extends BrokerTestBase { .subscribe(); ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() - .topic("persistent://prop-xyz/use/ns-abc/topic1"); + .topic("persistent://prop-xyz/use/ns-abc/topic1") + .messageRoutingMode(MessageRoutingMode.SinglePartition); if (batchMessageDelayMs != 0) { producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) .batchingMaxMessages(5); + } else { + producerBuilder.enableBatching(false); } Producer<byte[]> producer = producerBuilder.create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 35e7806..43f8bb2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -94,7 +95,11 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { final int numMessages = 20; final int maxKeys = 10; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); List<Pair<String, byte[]>> all = new ArrayList<>(); @@ -147,7 +152,10 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { public void testReadCompactedBeforeCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -185,7 +193,10 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { public void testReadEntriesAfterCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -214,7 +225,10 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { public void testSeekEarliestAfterCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); @@ -253,7 +267,10 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { public void testBrokerRestartAfterCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -293,7 +310,10 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { public void testCompactEmptyTopic() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -317,7 +337,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producer = pulsarClient.createProducer(topic)) { + try (Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create()) { producer.sendAsync(MessageBuilder.create() .setKey("key1") .setContent("my-message-1".getBytes()).build()); @@ -365,9 +385,14 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3) - .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create() + ) { producer.sendAsync(MessageBuilder.create() .setKey("key1") .setContent("my-message-1".getBytes()).build()); @@ -425,10 +450,17 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); - Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3) - .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producerNormal = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producerBatch = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producerBatch.sendAsync(MessageBuilder.create() .setKey("key1") .setContent("my-message-1".getBytes()).build()); @@ -533,10 +565,17 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); - Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3) - .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producerNormal = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + Producer producerBatch = pulsarClient.newProducer() + .topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create()) { // key0 persists through it all producerNormal.sendAsync(MessageBuilder.create() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 37446ad..7bd5adc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; @@ -117,7 +118,10 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { final int numMessages = 1000; final int maxKeys = 10; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); Random r = new Random(0); @@ -138,7 +142,10 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public void testCompactAddCompact() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); @@ -168,7 +175,10 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public void testCompactedInOrder() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send(MessageBuilder.create() .setKey("c") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index 03d82a7..a9723a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -115,7 +115,7 @@ public interface ProducerBuilder<T> extends Serializable, Cloneable { * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. * <p> * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail - * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. + * unless blockIfQueueFull is set to true. Use {@link #blockIfQueueFull(boolean)} to change the blocking behavior. * * @param maxPendingMessages * @return @@ -148,13 +148,15 @@ public interface ProducerBuilder<T> extends Serializable, Cloneable { /** * Set the message routing mode for the partitioned producer. * - * Default routing mode for messages to partition. + * Default routing mode is round-robin routing. * * This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a * particular message. * * @param messageRoutingMode * the message routing mode + * @return producer builder + * @see MessageRoutingMode */ ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode); @@ -205,9 +207,13 @@ public interface ProducerBuilder<T> extends Serializable, Cloneable { * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or * contents. * - * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages + * When enabled default batch delay is set to 1 ms and default batch size is 1000 messages * + * <p>Batching is enabled by default since 2.0.0. + * + * @return producer builder. * @see #batchingMaxPublishDelay(long, TimeUnit) + * @see #batchingMaxMessages(int) */ ProducerBuilder<T> enableBatching(boolean enableBatching); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index 49cd1d8..1af077b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -184,10 +184,11 @@ public class ProducerConfiguration implements Serializable { } /** - * Set the message routing mode for the partitioned producer + * Set the message routing mode for the partitioned producer. * - * @param mode - * @return + * @param messageRouteMode message routing mode. + * @return producer configuration + * @see MessageRoutingMode */ public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) { checkNotNull(messageRouteMode); @@ -197,9 +198,10 @@ public class ProducerConfiguration implements Serializable { } /** - * Get the message routing mode for the partitioned producer + * Get the message routing mode for the partitioned producer. * - * @return + * @return message routing mode, default is round-robin routing. + * @see MessageRoutingMode#RoundRobinPartition */ public MessageRoutingMode getMessageRoutingMode() { return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString()); @@ -269,9 +271,12 @@ public class ProducerConfiguration implements Serializable { } /** - * @ return if batch messages are enabled + * Return the flag whether automatic message batching is enabled or not. + * + * @return true if batch messages are enabled. otherwise false. + * @since 2.0.0 <br> + * It is enabled by default. */ - public boolean getBatchingEnabled() { return conf.isBatchingEnabled(); } @@ -290,8 +295,8 @@ public class ProducerConfiguration implements Serializable { * @since 1.0.36 <br> * Make sure all the consumer applications have been updated to use this client version, before starting to * batch messages. + * */ - public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) { conf.setBatchingEnabled(batchMessagesEnabled); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 1449a45..9f06809 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -50,7 +50,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private boolean blockIfQueueFull = false; private int maxPendingMessages = 1000; private int maxPendingMessagesAcrossPartitions = 50000; - private MessageRoutingMode messageRoutingMode = MessageRoutingMode.SinglePartition; + private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition; private HashingScheme hashingScheme = HashingScheme.JavaStringHash; private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL; @@ -58,9 +58,9 @@ public class ProducerConfigurationData implements Serializable, Cloneable { @JsonIgnore private MessageRouter customMessageRouter = null; - private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(10); + private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1); private int batchingMaxMessages = 1000; - private boolean batchingEnabled = false; // disabled by default + private boolean batchingEnabled = true; // enabled by default @JsonIgnore private CryptoKeyReader cryptoKeyReader; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index ee9d048..651056b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -85,8 +85,11 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { public void testProducerConsumer() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) .build(); - Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/producer-consumer-topic") - .create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://sample/test/local/producer-consumer-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // Create a consumer directly attached to broker Consumer<byte[]> consumer = pulsarClient.newConsumer() @@ -116,8 +119,10 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { .build(); admin.persistentTopics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); - Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/partitioned-topic") - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://sample/test/local/partitioned-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // Create a consumer directly attached to broker Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic") diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index c2c3ee9..5ed8adf 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -251,6 +251,9 @@ public class ProducerHandler extends AbstractWebSocketHandler { private ProducerConfiguration getProducerConfiguration() { ProducerConfiguration conf = new ProducerConfiguration(); + conf.setBatchingEnabled(false); + conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition); + // Set to false to prevent the server thread from being blocked if a lot of messages are pending. conf.setBlockIfQueueFull(false); diff --git a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java index 43c3866..3611736 100644 --- a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java +++ b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.tests.DockerUtils; @@ -65,7 +66,11 @@ public class TestCompaction extends Arquillian { try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) { client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); - try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) { + try(Producer<byte[]> producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); } -- To stop receiving notification emails like this one, please contact si...@apache.org.