This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 60df0a9 [Pulsar-Client] Extends ProducerBuilder Validations (#3193)
60df0a9 is described below
commit 60df0a989dc541ee05b1484a859d72ba7c0c499b
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Dec 16 01:34:04 2018 +0000
[Pulsar-Client] Extends ProducerBuilder Validations (#3193)
### Motivation
This PR aims to extend validations on `ProducerBuilder`.
### Modifications
1- `ProducerBuilder` needs to be robust for null and blank values as Public
API.
(e.g: When `topicName` is passed as blank, it comes as
`persistent://public/default/ `)
2- UT coverage is added.
---
.../pulsar/client/impl/ProducerBuilderImpl.java | 26 +--
.../client/impl/ProducerBuilderImplTest.java | 178 +++++++++++++++++++++
2 files changed, 195 insertions(+), 9 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 19df2e9..92526f5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
@@ -45,6 +46,8 @@ import org.apache.pulsar.common.util.FutureUtil;
import lombok.NonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
private final PulsarClientImpl client;
@@ -121,30 +124,28 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
@Override
public ProducerBuilder<T> topic(String topicName) {
+ checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be
blank");
conf.setTopicName(topicName);
return this;
}
@Override
- public ProducerBuilder<T> producerName(@NonNull String producerName) {
+ public ProducerBuilder<T> producerName(String producerName) {
+ checkArgument(StringUtils.isNotBlank(producerName), "producerName
cannot be blank");
conf.setProducerName(producerName);
return this;
}
@Override
public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit
unit) {
- if (sendTimeout < 0) {
- throw new IllegalArgumentException("sendTimeout needs to be >= 0");
- }
+ checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
@Override
public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
- if (maxPendingMessages <= 0) {
- throw new IllegalArgumentException("maxPendingMessages needs to be
> 0");
- }
+ checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be
> 0");
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
@@ -198,7 +199,8 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
}
@Override
- public ProducerBuilder<T> addEncryptionKey(@NonNull String key) {
+ public ProducerBuilder<T> addEncryptionKey(String key) {
+ checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be
blank");
conf.getEncryptionKeys().add(key);
return this;
}
@@ -228,13 +230,19 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
}
@Override
- public ProducerBuilder<T> property(@NonNull String key, @NonNull String
value) {
+ public ProducerBuilder<T> property(String key, String value) {
+ checkArgument(StringUtils.isNotBlank(key) &&
StringUtils.isNotBlank(value),
+ "property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder<T> properties(@NonNull Map<String, String>
properties) {
+ checkArgument(!properties.isEmpty(), "properties cannot be empty");
+ properties.entrySet().forEach(entry ->
+ checkArgument(StringUtils.isNotBlank(entry.getKey()) &&
StringUtils.isNotBlank(entry.getValue()),
+ "properties' key/value cannot be blank"));
conf.getProperties().putAll(properties);
return this;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index e436e96..d6877c1 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -24,7 +24,10 @@ import org.mockito.Matchers;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -53,6 +56,23 @@ public class ProducerBuilderImplTest {
}
@Test
+ public void testProducerBuilderImpl() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key2", "Test-Value2");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .maxPendingMessages(2)
+ .addEncryptionKey("Test-EncryptionKey")
+ .property("Test-Key", "Test-Value")
+ .properties(properties)
+ .create();
+
+ assertNotNull(producer);
+ }
+
+ @Test
public void
testProducerBuilderImplWhenMessageRoutingModeAndMessageRouterAreNotSet() throws
PulsarClientException {
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
@@ -126,6 +146,164 @@ public class ProducerBuilderImplTest {
.create();
}
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenTopicNameIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenTopicNameIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenProducerNameIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenProducerNameIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenSendTimeoutIsNegative() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .sendTimeout(-1, TimeUnit.MILLISECONDS)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenMaxPendingMessagesIsNegative()
throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .maxPendingMessages(-1)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .addEncryptionKey(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenEncryptionKeyIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .addEncryptionKey(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyKeyIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property(null, "Test-Value")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyKeyIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property(" ", "Test-Value")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyValueIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property("Test-Key", null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyValueIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property("Test-Key", " ")
+ .create();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testProducerBuilderImplWhenPropertiesIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesKeyIsNull() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(null, "Test-Value");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesKeyIsBlank() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(" ", "Test-Value");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesValueIsNull() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", null);
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesValueIsBlank() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", " ");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesIsEmpty() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
private class CustomMessageRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {