This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6fd7406 Remove guava usage in pulsar-storm (#2898) 6fd7406 is described below commit 6fd7406a4de061235ac718bb199f5e2f36fb2cc9 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Nov 7 13:34:12 2018 -0800 Remove guava usage in pulsar-storm (#2898) --- pulsar-storm/pom.xml | 34 --------------------- .../java/org/apache/pulsar/storm/PulsarBolt.java | 19 ++++++------ .../pulsar/storm/PulsarBoltConfiguration.java | 4 +-- .../java/org/apache/pulsar/storm/PulsarSpout.java | 35 +++++++++++----------- .../pulsar/storm/PulsarSpoutConfiguration.java | 5 ++-- .../apache/pulsar/storm/SharedPulsarClient.java | 5 ++-- 6 files changed, 32 insertions(+), 70 deletions(-) diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml index a025fdf..93a58cc 100644 --- a/pulsar-storm/pom.xml +++ b/pulsar-storm/pom.xml @@ -63,11 +63,6 @@ </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -109,34 +104,5 @@ <filtering>true</filtering> </resource> </resources> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> - <promoteTransitiveDependencies>false</promoteTransitiveDependencies> - <artifactSet> - <includes> - <include>com.google.guava:guava</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>com.google</pattern> - <shadedPattern>pulsar-storm-shade.com.google</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> </build> </project> diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java index 0aa1ee3..bc95e31 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java @@ -21,6 +21,8 @@ package org.apache.pulsar.storm; import static java.lang.String.format; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.pulsar.client.api.ClientBuilder; @@ -42,9 +44,6 @@ import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - @SuppressWarnings("deprecation") public class PulsarBolt extends BaseRichBolt implements IMetric { /** @@ -60,7 +59,7 @@ public class PulsarBolt extends BaseRichBolt implements IMetric { private final ClientConfigurationData clientConf; private final ProducerConfigurationData producerConf; private final PulsarBoltConfiguration pulsarBoltConf; - private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap(); + private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>(); private SharedPulsarClient sharedPulsarClient; private String componentId; @@ -73,9 +72,9 @@ public class PulsarBolt extends BaseRichBolt implements IMetric { public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) { this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(); this.producerConf = new ProducerConfigurationData(); - Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl()); - Preconditions.checkNotNull(pulsarBoltConf.getTopic()); - Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper()); + Objects.requireNonNull(pulsarBoltConf.getServiceUrl()); + Objects.requireNonNull(pulsarBoltConf.getTopic()); + Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper()); this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl()); this.producerConf.setTopicName(pulsarBoltConf.getTopic()); @@ -98,9 +97,9 @@ public class PulsarBolt extends BaseRichBolt implements IMetric { ProducerConfiguration producerConf) { this.clientConf = clientConf.getConfigurationData().clone(); this.producerConf = producerConf.getProducerConfigurationData().clone(); - Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl()); - Preconditions.checkNotNull(pulsarBoltConf.getTopic()); - Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper()); + Objects.requireNonNull(pulsarBoltConf.getServiceUrl()); + Objects.requireNonNull(pulsarBoltConf.getTopic()); + Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper()); this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl()); this.producerConf.setTopicName(pulsarBoltConf.getTopic()); this.pulsarBoltConf = pulsarBoltConf; diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java index a67ac2c..714e435 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.storm; -import com.google.common.base.Preconditions; +import java.util.Objects; /** * Class used to specify Pulsar bolt configuration @@ -51,7 +51,7 @@ public class PulsarBoltConfiguration extends PulsarStormConfiguration { * @param mapper */ public void setTupleToMessageMapper(TupleToMessageMapper mapper) { - this.tupleToMessageMapper = Preconditions.checkNotNull(mapper); + this.tupleToMessageMapper = Objects.requireNonNull(mapper); } } diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index af26035..5df0804 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -20,9 +20,13 @@ package org.apache.pulsar.storm; import static java.lang.String.format; +import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -38,7 +42,6 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.storm.metric.api.IMetric; -import org.apache.storm.shade.com.google.common.collect.Sets; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -48,10 +51,6 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; - @SuppressWarnings("deprecation") public class PulsarSpout extends BaseRichSpout implements IMetric { @@ -70,9 +69,9 @@ public class PulsarSpout extends BaseRichSpout implements IMetric { private final PulsarSpoutConfiguration pulsarSpoutConf; private final long failedRetriesTimeoutNano; private final int maxFailedRetries; - private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = Maps.newConcurrentMap(); - private final Queue<Message<byte[]>> failedMessages = Queues.newConcurrentLinkedQueue(); - private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap(); + private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>(); + private final Queue<Message<byte[]>> failedMessages = new ConcurrentLinkedQueue<>(); + private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>(); private SharedPulsarClient sharedPulsarClient; private String componentId; @@ -85,15 +84,15 @@ public class PulsarSpout extends BaseRichSpout implements IMetric { private volatile long messageSizeReceived = 0; public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) { - Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl()); - Preconditions.checkNotNull(pulsarSpoutConf.getTopic()); - Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName()); - Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper()); + Objects.requireNonNull(pulsarSpoutConf.getServiceUrl()); + Objects.requireNonNull(pulsarSpoutConf.getTopic()); + Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName()); + Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper()); this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(); this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); this.consumerConf = new ConsumerConfigurationData<>(); - this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic())); + this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); this.pulsarSpoutConf = pulsarSpoutConf; @@ -111,13 +110,13 @@ public class PulsarSpout extends BaseRichSpout implements IMetric { ConsumerConfiguration consumerConf) { this.clientConf = clientConf.getConfigurationData().clone(); this.consumerConf = consumerConf.getConfigurationData().clone(); - Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl()); - Preconditions.checkNotNull(pulsarSpoutConf.getTopic()); - Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName()); - Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper()); + Objects.requireNonNull(pulsarSpoutConf.getServiceUrl()); + Objects.requireNonNull(pulsarSpoutConf.getTopic()); + Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName()); + Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper()); this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); - this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic())); + this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); this.pulsarSpoutConf = pulsarSpoutConf; diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java index 79e15d2..7582d74 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java @@ -18,10 +18,9 @@ */ package org.apache.pulsar.storm; +import java.util.Objects; import java.util.concurrent.TimeUnit; -import com.google.common.base.Preconditions; - /** * Class used to specify pulsar spout configuration * @@ -75,7 +74,7 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration { * @param mapper */ public void setMessageToValuesMapper(MessageToValuesMapper mapper) { - this.messageToValuesMapper = Preconditions.checkNotNull(mapper); + this.messageToValuesMapper = Objects.requireNonNull(mapper); } /** diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java index 4506e11..d07903e 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java @@ -19,6 +19,7 @@ package org.apache.pulsar.storm; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -33,11 +34,9 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - public class SharedPulsarClient { private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class); - private static final ConcurrentMap<String, SharedPulsarClient> instances = Maps.newConcurrentMap(); + private static final ConcurrentMap<String, SharedPulsarClient> instances = new ConcurrentHashMap<>(); private final String componentId; private final PulsarClientImpl client;