sijie closed pull request #3193: [Pulsar-Client] Extends ProducerBuilder
Validations
URL: https://github.com/apache/pulsar/pull/3193
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 19df2e9ecc..92526f52b3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -28,6 +28,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
@@ -45,6 +46,8 @@
import lombok.NonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
private final PulsarClientImpl client;
@@ -121,30 +124,28 @@ private ProducerBuilderImpl(PulsarClientImpl client,
ProducerConfigurationData c
@Override
public ProducerBuilder<T> topic(String topicName) {
+ checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be
blank");
conf.setTopicName(topicName);
return this;
}
@Override
- public ProducerBuilder<T> producerName(@NonNull String producerName) {
+ public ProducerBuilder<T> producerName(String producerName) {
+ checkArgument(StringUtils.isNotBlank(producerName), "producerName
cannot be blank");
conf.setProducerName(producerName);
return this;
}
@Override
public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit
unit) {
- if (sendTimeout < 0) {
- throw new IllegalArgumentException("sendTimeout needs to be >= 0");
- }
+ checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
@Override
public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
- if (maxPendingMessages <= 0) {
- throw new IllegalArgumentException("maxPendingMessages needs to be
> 0");
- }
+ checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be
> 0");
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
@@ -198,7 +199,8 @@ private ProducerBuilderImpl(PulsarClientImpl client,
ProducerConfigurationData c
}
@Override
- public ProducerBuilder<T> addEncryptionKey(@NonNull String key) {
+ public ProducerBuilder<T> addEncryptionKey(String key) {
+ checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be
blank");
conf.getEncryptionKeys().add(key);
return this;
}
@@ -228,13 +230,19 @@ private ProducerBuilderImpl(PulsarClientImpl client,
ProducerConfigurationData c
}
@Override
- public ProducerBuilder<T> property(@NonNull String key, @NonNull String
value) {
+ public ProducerBuilder<T> property(String key, String value) {
+ checkArgument(StringUtils.isNotBlank(key) &&
StringUtils.isNotBlank(value),
+ "property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder<T> properties(@NonNull Map<String, String>
properties) {
+ checkArgument(!properties.isEmpty(), "properties cannot be empty");
+ properties.entrySet().forEach(entry ->
+ checkArgument(StringUtils.isNotBlank(entry.getKey()) &&
StringUtils.isNotBlank(entry.getValue()),
+ "properties' key/value cannot be blank"));
conf.getProperties().putAll(properties);
return this;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index e436e968d8..d6877c18fa 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -24,7 +24,10 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -52,6 +55,23 @@ public void setup() {
.thenReturn(CompletableFuture.completedFuture(producer));
}
+ @Test
+ public void testProducerBuilderImpl() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key2", "Test-Value2");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .maxPendingMessages(2)
+ .addEncryptionKey("Test-EncryptionKey")
+ .property("Test-Key", "Test-Value")
+ .properties(properties)
+ .create();
+
+ assertNotNull(producer);
+ }
+
@Test
public void
testProducerBuilderImplWhenMessageRoutingModeAndMessageRouterAreNotSet() throws
PulsarClientException {
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
@@ -126,6 +146,164 @@ public void
testProducerBuilderImplWhenMessageRoutingModeIsCustomPartitionAndMes
.create();
}
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenTopicNameIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenTopicNameIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenProducerNameIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenProducerNameIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenSendTimeoutIsNegative() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .sendTimeout(-1, TimeUnit.MILLISECONDS)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenMaxPendingMessagesIsNegative()
throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .maxPendingMessages(-1)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .addEncryptionKey(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenEncryptionKeyIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .addEncryptionKey(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyKeyIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property(null, "Test-Value")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyKeyIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property(" ", "Test-Value")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyValueIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property("Test-Key", null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyValueIsBlank() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property("Test-Key", " ")
+ .create();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testProducerBuilderImplWhenPropertiesIsNull() throws
PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesKeyIsNull() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(null, "Test-Value");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesKeyIsBlank() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(" ", "Test-Value");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesValueIsNull() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", null);
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesValueIsBlank() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", " ");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesIsEmpty() throws
PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
private class CustomMessageRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services