This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd7406  Remove guava usage in pulsar-storm (#2898)
6fd7406 is described below

commit 6fd7406a4de061235ac718bb199f5e2f36fb2cc9
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Nov 7 13:34:12 2018 -0800

    Remove guava usage in pulsar-storm (#2898)
---
 pulsar-storm/pom.xml                               | 34 ---------------------
 .../java/org/apache/pulsar/storm/PulsarBolt.java   | 19 ++++++------
 .../pulsar/storm/PulsarBoltConfiguration.java      |  4 +--
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 35 +++++++++++-----------
 .../pulsar/storm/PulsarSpoutConfiguration.java     |  5 ++--
 .../apache/pulsar/storm/SharedPulsarClient.java    |  5 ++--
 6 files changed, 32 insertions(+), 70 deletions(-)

diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
index a025fdf..93a58cc 100644
--- a/pulsar-storm/pom.xml
+++ b/pulsar-storm/pom.xml
@@ -63,11 +63,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -109,34 +104,5 @@
         <filtering>true</filtering>
       </resource>
     </resources>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <createDependencyReducedPom>true</createDependencyReducedPom>
-              
<promoteTransitiveDependencies>false</promoteTransitiveDependencies>
-              <artifactSet>
-                <includes>
-                  <include>com.google.guava:guava</include>
-                </includes>
-              </artifactSet>
-              <relocations>
-                <relocation>
-                  <pattern>com.google</pattern>
-                  <shadedPattern>pulsar-storm-shade.com.google</shadedPattern>
-                </relocation>
-              </relocations>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
   </build>
 </project>
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 0aa1ee3..bc95e31 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.storm;
 import static java.lang.String.format;
 
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -42,9 +44,6 @@ import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
 @SuppressWarnings("deprecation")
 public class PulsarBolt extends BaseRichBolt implements IMetric {
     /**
@@ -60,7 +59,7 @@ public class PulsarBolt extends BaseRichBolt implements 
IMetric {
     private final ClientConfigurationData clientConf;
     private final ProducerConfigurationData producerConf;
     private final PulsarBoltConfiguration pulsarBoltConf;
-    private final ConcurrentMap<String, Object> metricsMap = 
Maps.newConcurrentMap();
+    private final ConcurrentMap<String, Object> metricsMap = new 
ConcurrentHashMap<>();
 
     private SharedPulsarClient sharedPulsarClient;
     private String componentId;
@@ -73,9 +72,9 @@ public class PulsarBolt extends BaseRichBolt implements 
IMetric {
     public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder 
clientBuilder) {
         this.clientConf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData().clone();
         this.producerConf = new ProducerConfigurationData();
-        Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
-        Preconditions.checkNotNull(pulsarBoltConf.getTopic());
-        Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+        Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+        Objects.requireNonNull(pulsarBoltConf.getTopic());
+        Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
 
         this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
         this.producerConf.setTopicName(pulsarBoltConf.getTopic());
@@ -98,9 +97,9 @@ public class PulsarBolt extends BaseRichBolt implements 
IMetric {
             ProducerConfiguration producerConf) {
         this.clientConf = clientConf.getConfigurationData().clone();
         this.producerConf = 
producerConf.getProducerConfigurationData().clone();
-        Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
-        Preconditions.checkNotNull(pulsarBoltConf.getTopic());
-        Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+        Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+        Objects.requireNonNull(pulsarBoltConf.getTopic());
+        Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
         this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
         this.producerConf.setTopicName(pulsarBoltConf.getTopic());
         this.pulsarBoltConf = pulsarBoltConf;
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
index a67ac2c..714e435 100644
--- 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.storm;
 
-import com.google.common.base.Preconditions;
+import java.util.Objects;
 
 /**
  * Class used to specify Pulsar bolt configuration
@@ -51,7 +51,7 @@ public class PulsarBoltConfiguration extends 
PulsarStormConfiguration {
      * @param mapper
      */
     public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
-        this.tupleToMessageMapper = Preconditions.checkNotNull(mapper);
+        this.tupleToMessageMapper = Objects.requireNonNull(mapper);
     }
 
 }
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index af26035..5df0804 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -20,9 +20,13 @@ package org.apache.pulsar.storm;
 
 import static java.lang.String.format;
 
+import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
@@ -38,7 +42,6 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.shade.com.google.common.collect.Sets;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -48,10 +51,6 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-
 @SuppressWarnings("deprecation")
 public class PulsarSpout extends BaseRichSpout implements IMetric {
 
@@ -70,9 +69,9 @@ public class PulsarSpout extends BaseRichSpout implements 
IMetric {
     private final PulsarSpoutConfiguration pulsarSpoutConf;
     private final long failedRetriesTimeoutNano;
     private final int maxFailedRetries;
-    private final ConcurrentMap<MessageId, MessageRetries> 
pendingMessageRetries = Maps.newConcurrentMap();
-    private final Queue<Message<byte[]>> failedMessages = 
Queues.newConcurrentLinkedQueue();
-    private final ConcurrentMap<String, Object> metricsMap = 
Maps.newConcurrentMap();
+    private final ConcurrentMap<MessageId, MessageRetries> 
pendingMessageRetries = new ConcurrentHashMap<>();
+    private final Queue<Message<byte[]>> failedMessages = new 
ConcurrentLinkedQueue<>();
+    private final ConcurrentMap<String, Object> metricsMap = new 
ConcurrentHashMap<>();
 
     private SharedPulsarClient sharedPulsarClient;
     private String componentId;
@@ -85,15 +84,15 @@ public class PulsarSpout extends BaseRichSpout implements 
IMetric {
     private volatile long messageSizeReceived = 0;
 
     public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder 
clientBuilder) {
-        Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
-        Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
-        Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
-        Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+        Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+        Objects.requireNonNull(pulsarSpoutConf.getTopic());
+        Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+        Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
 
         this.clientConf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData().clone();
         this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
         this.consumerConf = new ConsumerConfigurationData<>();
-        
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+        
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
         
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
 
         this.pulsarSpoutConf = pulsarSpoutConf;
@@ -111,13 +110,13 @@ public class PulsarSpout extends BaseRichSpout implements 
IMetric {
             ConsumerConfiguration consumerConf) {
         this.clientConf = clientConf.getConfigurationData().clone();
         this.consumerConf = consumerConf.getConfigurationData().clone();
-        Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
-        Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
-        Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
-        Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+        Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+        Objects.requireNonNull(pulsarSpoutConf.getTopic());
+        Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+        Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
 
         this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
-        
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+        
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
         
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
 
         this.pulsarSpoutConf = pulsarSpoutConf;
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 79e15d2..7582d74 100644
--- 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pulsar.storm;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Class used to specify pulsar spout configuration
  *
@@ -75,7 +74,7 @@ public class PulsarSpoutConfiguration extends 
PulsarStormConfiguration {
      * @param mapper
      */
     public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
-        this.messageToValuesMapper = Preconditions.checkNotNull(mapper);
+        this.messageToValuesMapper = Objects.requireNonNull(mapper);
     }
 
     /**
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index 4506e11..d07903e 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.storm;
 
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -33,11 +34,9 @@ import 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 public class SharedPulsarClient {
     private static final Logger LOG = 
LoggerFactory.getLogger(SharedPulsarClient.class);
-    private static final ConcurrentMap<String, SharedPulsarClient> instances = 
Maps.newConcurrentMap();
+    private static final ConcurrentMap<String, SharedPulsarClient> instances = 
new ConcurrentHashMap<>();
 
     private final String componentId;
     private final PulsarClientImpl client;

Reply via email to