This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aeddbb  Add Strom adapter back after removing from Apache Storm repo 
(#55)
5aeddbb is described below

commit 5aeddbb21508fa70d36199f95dabebe17774384c
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Wed Nov 1 10:24:57 2023 -0700

    Add Strom adapter back after removing from Apache Storm repo (#55)
    
    This reverts commit ab537c13061b06de2044cd43965de930d762fe8a.
---
 .asf.yaml                                          |   3 +-
 README.md                                          |   4 +-
 pom.xml                                            |  19 +
 pulsar-storm/pom.xml                               | 102 +++++
 .../apache/pulsar/storm/MessageToValuesMapper.java |  44 ++
 .../java/org/apache/pulsar/storm/PulsarBolt.java   | 207 +++++++++
 .../pulsar/storm/PulsarBoltConfiguration.java      |  57 +++
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 494 +++++++++++++++++++++
 .../pulsar/storm/PulsarSpoutConfiguration.java     | 195 ++++++++
 .../apache/pulsar/storm/PulsarSpoutConsumer.java   |  58 +++
 .../pulsar/storm/PulsarStormConfiguration.java     |  90 ++++
 .../java/org/apache/pulsar/storm/PulsarTuple.java  |  45 ++
 .../apache/pulsar/storm/SharedPulsarClient.java    | 155 +++++++
 .../apache/pulsar/storm/TupleToMessageMapper.java  |  66 +++
 pulsar-storm/src/main/javadoc/overview.html        |  29 ++
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 178 ++++++++
 tests/pom.xml                                      |   1 +
 tests/pulsar-storm-test/pom.xml                    | 131 ++++++
 .../apache/pulsar/storm/MockOutputCollector.java   | 101 +++++
 .../pulsar/storm/MockSpoutOutputCollector.java     |  80 ++++
 .../org/apache/pulsar/storm/PulsarBoltTest.java    | 236 ++++++++++
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 349 +++++++++++++++
 .../java/org/apache/pulsar/storm/TestUtil.java     |  35 ++
 .../apache/pulsar/storm/example/StormExample.java  | 166 +++++++
 24 files changed, 2841 insertions(+), 4 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index c04e151..74589a2 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -27,6 +27,7 @@ github:
     - streaming
     - queuing
     - event-streaming
+    - apache-storm
     - apache-spark
     - apache-kafka
   features:
@@ -47,4 +48,4 @@ github:
 notifications:
   commits:      commits@pulsar.apache.org
   issues:       commits@pulsar.apache.org
-  pullrequests: commits@pulsar.apache.org
+  pullrequests: commits@pulsar.apache.org
\ No newline at end of file
diff --git a/README.md b/README.md
index 33409df..ce62338 100644
--- a/README.md
+++ b/README.md
@@ -25,8 +25,6 @@ This repository is used for hosting all the adapters 
maintained and supported by
 
 [Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is 
supported and maintained by Apache Flink Community.
 
-[Apache Storm bolt and 
spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are 
supported by Apache Storm Community.
-
 ## Building
 
 In order to build this code you can simply use Maven
@@ -44,5 +42,5 @@ git checkout v2.11.0
 mvn clean install -DskipTests
 ```
 
-This is because this repository depends on test integration artifacts of the 
relative version on the main
+This is because this repository depends on test integration artifacts of the 
relative version on the main 
 Apache Pulsar codebase
diff --git a/pom.xml b/pom.xml
index 5f39f48..4f240d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
   <properties>
     <pulsar.version>2.11.0</pulsar.version>
     <kafka-client.version>2.7.2</kafka-client.version>
+    <storm.version>2.0.0</storm.version>
     <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
     <avro.version>1.10.2</avro.version>
     <log4j.version>1.2.17</log4j.version>
@@ -139,6 +140,7 @@
   </properties>
 
   <modules>
+    <module>pulsar-storm</module>
     <module>pulsar-spark</module>
     <module>pulsar-client-kafka-compat</module>
     <module>pulsar-log4j2-appender</module>
@@ -250,6 +252,22 @@
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-client</artifactId>
+        <version>${storm.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-server</artifactId>
+        <version>${storm.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-core</artifactId>
+        <version>${storm.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.9.2</artifactId>
@@ -1068,3 +1086,4 @@
   </repositories>
 
 </project>
+
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
new file mode 100644
index 0000000..a20649b
--- /dev/null
+++ b/pulsar-storm/pom.xml
@@ -0,0 +1,102 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-adapters</artifactId>
+    <version>2.11.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-storm</artifactId>
+  <name>Pulsar Storm adapter</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.yaml</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-client</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+  </build>
+</project>
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
new file mode 100644
index 0000000..92e127c
--- /dev/null
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.Message;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+
+public interface MessageToValuesMapper extends Serializable {
+
+    /**
+     * Convert {@link org.apache.pulsar.client.api.Message} to tuple values.
+     *
+     * @param msg
+     * @return
+     */
+    Values toValues(Message<byte[]> msg);
+
+    /**
+     * Declare the output schema for the spout.
+     *
+     * @param declarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
new file mode 100644
index 0000000..32fa78f
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static java.lang.String.format;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class PulsarBolt extends BaseRichBolt implements IMetric {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarBolt.class);
+
+    public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent";
+    public static final String PRODUCER_RATE = "producerRate";
+    public static final String PRODUCER_THROUGHPUT_BYTES = 
"producerThroughput";
+
+    private final ClientConfigurationData clientConf;
+    private final ProducerConfigurationData producerConf;
+    private final PulsarBoltConfiguration pulsarBoltConf;
+    private final ConcurrentMap<String, Object> metricsMap = new 
ConcurrentHashMap<>();
+
+    private SharedPulsarClient sharedPulsarClient;
+    private String componentId;
+    private String boltId;
+    private OutputCollector collector;
+    private Producer<byte[]> producer;
+    private volatile long messagesSent = 0;
+    private volatile long messageSizeSent = 0;
+
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
+        this(pulsarBoltConf, PulsarClient.builder());
+    }
+
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder 
clientBuilder) {
+        this(pulsarBoltConf, ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData().clone(),
+                new ProducerConfigurationData());
+    }
+
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, 
ClientConfigurationData clientConf,
+            ProducerConfigurationData producerConf) {
+        checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
+        checkNotNull(clientConf, "client configuration can't be null");
+        checkNotNull(producerConf, "producer configuration can't be null");
+        Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+        Objects.requireNonNull(pulsarBoltConf.getTopic());
+        Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
+        this.pulsarBoltConf = pulsarBoltConf;
+        this.clientConf = clientConf;
+        this.producerConf = producerConf;
+        this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+        this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+        this.producerConf.setBatcherBuilder(null);
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+        this.componentId = context.getThisComponentId();
+        this.boltId = String.format("%s-%s", componentId, 
context.getThisTaskId());
+        this.collector = collector;
+        try {
+            sharedPulsarClient = SharedPulsarClient.get(componentId, 
clientConf);
+            producer = sharedPulsarClient.getSharedProducer(producerConf);
+            LOG.info("[{}] Created a pulsar producer on topic {} to send 
messages", boltId, pulsarBoltConf.getTopic());
+        } catch (PulsarClientException e) {
+            LOG.error("[{}] Error initializing pulsar producer on topic {}", 
boltId, pulsarBoltConf.getTopic(), e);
+            throw new IllegalStateException(
+                    format("Failed to initialize producer for %s : %s", 
pulsarBoltConf.getTopic(), e.getMessage()), e);
+        }
+        context.registerMetric(String.format("PulsarBoltMetrics-%s-%s", 
componentId, context.getThisTaskIndex()), this,
+                pulsarBoltConf.getMetricsTimeIntervalInSecs());
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (TupleUtils.isTick(input)) {
+            collector.ack(input);
+            return;
+        }
+        try {
+            if (producer != null) {
+                // a message key can be provided in the mapper
+                TypedMessageBuilder<byte[]> msgBuilder = 
pulsarBoltConf.getTupleToMessageMapper()
+                        .toMessage(producer.newMessage(), input);
+                if (msgBuilder == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("[{}] Cannot send null message, acking the 
collector", boltId);
+                    }
+                    collector.ack(input);
+                } else {
+                    final long messageSizeToBeSent = 
((TypedMessageBuilderImpl<byte[]>) msgBuilder).getContent()
+                            .remaining();
+                    msgBuilder.sendAsync().handle((msgId, ex) -> {
+                        synchronized (collector) {
+                            if (ex != null) {
+                                collector.reportError(ex);
+                                collector.fail(input);
+                                LOG.error("[{}] Message send failed", boltId, 
ex);
+
+                            } else {
+                                collector.ack(input);
+                                ++messagesSent;
+                                messageSizeSent += messageSizeToBeSent;
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("[{}] Message sent with id {}", 
boltId, msgId);
+                                }
+                            }
+                        }
+
+                        return null;
+                    });
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("[{}] Message processing failed", boltId, e);
+            collector.reportError(e);
+            collector.fail(input);
+        }
+    }
+
+    public void close() {
+        try {
+            LOG.info("[{}] Closing Pulsar producer on topic {}", boltId, 
pulsarBoltConf.getTopic());
+            if (sharedPulsarClient != null) {
+                sharedPulsarClient.close();
+            }
+        } catch (PulsarClientException e) {
+            LOG.error("[{}] Error closing Pulsar producer on topic {}", 
boltId, pulsarBoltConf.getTopic(), e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        close();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
+    }
+
+    /**
+     * Helpers for metrics.
+     */
+
+    @SuppressWarnings({ "rawtypes" })
+    ConcurrentMap getMetrics() {
+        metricsMap.put(NO_OF_MESSAGES_SENT, messagesSent);
+        metricsMap.put(PRODUCER_RATE, ((double) messagesSent) / 
pulsarBoltConf.getMetricsTimeIntervalInSecs());
+        metricsMap.put(PRODUCER_THROUGHPUT_BYTES,
+                ((double) messageSizeSent) / 
pulsarBoltConf.getMetricsTimeIntervalInSecs());
+        return metricsMap;
+    }
+
+    void resetMetrics() {
+        messagesSent = 0;
+        messageSizeSent = 0;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Object getValueAndReset() {
+        ConcurrentMap metrics = getMetrics();
+        resetMetrics();
+        return metrics;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
new file mode 100644
index 0000000..714e435
--- /dev/null
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.Objects;
+
+/**
+ * Class used to specify Pulsar bolt configuration
+ *
+ *
+ */
+public class PulsarBoltConfiguration extends PulsarStormConfiguration {
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+
+    private TupleToMessageMapper tupleToMessageMapper = null;
+
+    /**
+     * @return the mapper to convert storm tuples to a pulsar message
+     */
+    public TupleToMessageMapper getTupleToMessageMapper() {
+        return tupleToMessageMapper;
+    }
+
+    /**
+     * Sets the mapper to convert storm tuples to a pulsar message
+     * <p>
+     * Note: If the mapper returns null, the message is not sent by the 
producer and is acked immediately on the
+     * collector
+     * </p>
+     *
+     * @param mapper
+     */
+    public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
+        this.tupleToMessageMapper = Objects.requireNonNull(mapper);
+    }
+
+}
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
new file mode 100644
index 0000000..8ed090e
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarSpout extends BaseRichSpout implements IMetric {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSpout.class);
+
+    public static final String NO_OF_PENDING_FAILED_MESSAGES = 
"numberOfPendingFailedMessages";
+    public static final String NO_OF_MESSAGES_RECEIVED = 
"numberOfMessagesReceived";
+    public static final String NO_OF_MESSAGES_EMITTED = 
"numberOfMessagesEmitted";
+    public static final String NO_OF_MESSAGES_FAILED = 
"numberOfMessagesFailed";
+    public static final String MESSAGE_NOT_AVAILABLE_COUNT = 
"messageNotAvailableCount";
+    public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
+    public static final String CONSUMER_RATE = "consumerRate";
+    public static final String CONSUMER_THROUGHPUT_BYTES = 
"consumerThroughput";
+
+    private final ClientConfigurationData clientConf;
+    private final PulsarSpoutConfiguration pulsarSpoutConf;
+    private final ConsumerConfigurationData<byte[]> consumerConf;
+    private final long failedRetriesTimeoutNano;
+    private final int maxFailedRetries;
+    private final ConcurrentMap<MessageId, MessageRetries> 
pendingMessageRetries = new ConcurrentHashMap<>();
+    private final Queue<Message<byte[]>> failedMessages = new 
ConcurrentLinkedQueue<>();
+    private final ConcurrentMap<String, Object> metricsMap = new 
ConcurrentHashMap<>();
+
+    private SharedPulsarClient sharedPulsarClient;
+    private String componentId;
+    private String spoutId;
+    private SpoutOutputCollector collector;
+    private PulsarSpoutConsumer consumer;
+    private volatile long messagesReceived = 0;
+    private volatile long messagesEmitted = 0;
+    private volatile long messagesFailed = 0;
+    private volatile long messageNotAvailableCount = 0;
+    private volatile long pendingAcks = 0;
+    private volatile long messageSizeReceived = 0;
+
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) {
+        this(pulsarSpoutConf, PulsarClient.builder());
+    }
+
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder 
clientBuilder) {
+        this(pulsarSpoutConf, ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData().clone(),
+                new ConsumerConfigurationData<byte[]>());
+    }
+
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, 
ClientConfigurationData clientConfig,
+            ConsumerConfigurationData<byte[]> consumerConfig) {
+        Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+        Objects.requireNonNull(pulsarSpoutConf.getTopic());
+        Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+        Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+        checkNotNull(pulsarSpoutConf, "spout configuration can't be null");
+        checkNotNull(clientConfig, "client configuration can't be null");
+        checkNotNull(consumerConfig, "consumer configuration can't be null");
+        this.clientConf = clientConfig;
+        this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+        this.consumerConf = consumerConfig;
+        this.pulsarSpoutConf = pulsarSpoutConf;
+        this.failedRetriesTimeoutNano = 
pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
+        this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
+    }
+
+    @Override
+    public void close() {
+        try {
+            LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, 
pulsarSpoutConf.getTopic());
+
+            if (pulsarSpoutConf.isAutoUnsubscribe()) {
+                try {
+                    consumer.unsubscribe();
+                } catch (PulsarClientException e) {
+                    LOG.error("[{}] Failed to unsubscribe {} on topic {}", 
spoutId,
+                            this.pulsarSpoutConf.getSubscriptionName(), 
pulsarSpoutConf.getTopic(), e);
+                }
+            }
+
+            if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != 
null) {
+                consumer.close();
+            }
+            if (sharedPulsarClient != null) {
+                sharedPulsarClient.close();
+            }
+            pendingMessageRetries.clear();
+            failedMessages.clear();
+        } catch (PulsarClientException e) {
+            LOG.error("[{}] Error closing Pulsar consumer for topic {}", 
spoutId, pulsarSpoutConf.getTopic(), e);
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        if (msgId instanceof Message) {
+            Message<?> msg = (Message<?>) msgId;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Received ack for message {}", spoutId, 
msg.getMessageId());
+            }
+            consumer.acknowledgeAsync(msg);
+            pendingMessageRetries.remove(msg.getMessageId());
+            // we should also remove message from failedMessages but it will be
+            // eventually removed while emitting next
+            // tuple
+            --pendingAcks;
+        }
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        if (msgId instanceof Message) {
+            @SuppressWarnings("unchecked")
+            Message<byte[]> msg = (Message<byte[]>) msgId;
+            MessageId id = msg.getMessageId();
+            LOG.warn("[{}] Error processing message {}", spoutId, id);
+
+            // Since the message processing failed, we put it in the failed
+            // messages queue if there are more retries
+            // remaining for the message
+            MessageRetries messageRetries = 
pendingMessageRetries.computeIfAbsent(id, (k) -> new MessageRetries());
+            if ((failedRetriesTimeoutNano < 0
+                    || (messageRetries.getTimeStamp() + 
failedRetriesTimeoutNano) > System.nanoTime())
+                    && (maxFailedRetries < 0 || messageRetries.numRetries < 
maxFailedRetries)) {
+                // since we can retry again, we increment retry count and put 
it
+                // in the queue
+                LOG.info("[{}] Putting message {} in the retry queue", 
spoutId, id);
+                messageRetries.incrementAndGet();
+                pendingMessageRetries.putIfAbsent(id, messageRetries);
+                failedMessages.add(msg);
+                --pendingAcks;
+                messagesFailed++;
+            } else {
+                LOG.warn("[{}] Number of retries limit reached, dropping the 
message {}", spoutId, id);
+                ack(msg);
+            }
+        }
+
+    }
+
+    /**
+     * Emits a tuple received from the Pulsar consumer unless there are any
+     * failed messages.
+     */
+    @Override
+    public void nextTuple() {
+        emitNextAvailableTuple();
+    }
+
+    /**
+     * It makes sure that it emits next available non-tuple to topology unless
+     * consumer queue doesn't have any message available. It receives message
+     * from consumer queue and converts it to tuple and emits to topology. if
+     * the converted tuple is null then it tries to receives next message and
+     * perform the same until it finds non-tuple to emit.
+     */
+    public void emitNextAvailableTuple() {
+        // check if there are any failed messages to re-emit in the topology
+        if (emitFailedMessage()) {
+            return;
+        }
+
+        Message<byte[]> msg;
+        // receive from consumer if no failed messages
+        if (consumer != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Receiving the next message from pulsar 
consumer to emit to the collector", spoutId);
+            }
+            try {
+                boolean done = false;
+                while (!done) {
+                    msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        ++messagesReceived;
+                        messageSizeReceived += msg.getData().length;
+                        done = mapToValueAndEmit(msg);
+                    } else {
+                        // queue is empty and nothing to emit
+                        done = true;
+                        messageNotAvailableCount++;
+                    }
+                }
+            } catch (PulsarClientException e) {
+                LOG.error("[{}] Error receiving message from pulsar consumer", 
spoutId, e);
+            }
+        }
+    }
+
+    private boolean emitFailedMessage() {
+        Message<byte[]> msg;
+
+        while ((msg = failedMessages.peek()) != null) {
+            MessageRetries messageRetries = 
pendingMessageRetries.get(msg.getMessageId());
+            if (messageRetries != null) {
+                // emit the tuple if retry doesn't need backoff else sleep with
+                // backoff time and return without doing
+                // anything
+                if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), 
TimeUnit.NANOSECONDS,
+                        messageRetries.getNumRetries(), 
clientConf.getInitialBackoffIntervalNanos(),
+                        clientConf.getMaxBackoffIntervalNanos())) {
+                    
Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos()));
+                } else {
+                    // remove the message from the queue and emit to the
+                    // topology, only if it should not be backedoff
+                    LOG.info("[{}] Retrying failed message {}", spoutId, 
msg.getMessageId());
+                    failedMessages.remove();
+                    mapToValueAndEmit(msg);
+                }
+                return true;
+            }
+
+            // messageRetries is null because messageRetries is already acked
+            // and removed from pendingMessageRetries
+            // then remove it from failed message queue as well.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}]-{} removing {} from failedMessage because it's 
already acked",
+                        pulsarSpoutConf.getTopic(), spoutId, 
msg.getMessageId());
+            }
+            failedMessages.remove();
+            // try to find out next failed message
+            continue;
+        }
+        return false;
+    }
+
+    @Override
+    @SuppressWarnings({ "rawtypes" })
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
+        this.componentId = context.getThisComponentId();
+        this.spoutId = String.format("%s-%s", componentId, 
context.getThisTaskId());
+        this.collector = collector;
+        pendingMessageRetries.clear();
+        failedMessages.clear();
+        try {
+            consumer = createConsumer();
+            LOG.info("[{}] Created a pulsar consumer on topic {} to receive 
messages with subscription {}", spoutId,
+                    pulsarSpoutConf.getTopic(), 
pulsarSpoutConf.getSubscriptionName());
+        } catch (PulsarClientException e) {
+            LOG.error("[{}] Error creating pulsar consumer on topic {}", 
spoutId, pulsarSpoutConf.getTopic(), e);
+            throw new IllegalStateException(format("Failed to initialize 
consumer for %s-%s : %s",
+                    pulsarSpoutConf.getTopic(), 
pulsarSpoutConf.getSubscriptionName(), e.getMessage()), e);
+        }
+        context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", 
componentId, context.getThisTaskIndex()), this,
+                pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+    }
+
+    private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
+        sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
+        PulsarSpoutConsumer consumer;
+        if (pulsarSpoutConf.isSharedConsumerEnabled()) {
+            consumer = pulsarSpoutConf.isDurableSubscription()
+                    ? new 
SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration()))
+                    : new 
SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration()));
+        } else {
+            try {
+                consumer = pulsarSpoutConf.isDurableSubscription()
+                        ? new SpoutConsumer(
+                                
sharedPulsarClient.getClient().subscribeAsync(newConsumerConfiguration()).join())
+                        : new SpoutReader(
+                                
sharedPulsarClient.getClient().createReaderAsync(newReaderConfiguration()).join());
+            } catch (CompletionException e) {
+                throw (PulsarClientException) e.getCause();
+            }
+        }
+        return consumer;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
+
+    }
+
+    private boolean mapToValueAndEmit(Message<byte[]> msg) {
+        if (msg != null) {
+            Values values = 
pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
+            ++pendingAcks;
+            if (values == null) {
+                // since the mapper returned null, we can drop the message and
+                // ack it immediately
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Dropping message {}", spoutId, 
msg.getMessageId());
+                }
+                ack(msg);
+            } else {
+                if (values instanceof PulsarTuple) {
+                    collector.emit(((PulsarTuple) values).getOutputStream(), 
values, msg);
+                } else {
+                    collector.emit(values, msg);
+                }
+                ++messagesEmitted;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Emitted message {} to the collector", 
spoutId, msg.getMessageId());
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public class MessageRetries {
+        private final long timestampInNano;
+        private int numRetries;
+
+        public MessageRetries() {
+            this.timestampInNano = System.nanoTime();
+            this.numRetries = 0;
+        }
+
+        public long getTimeStamp() {
+            return timestampInNano;
+        }
+
+        public int incrementAndGet() {
+            return ++numRetries;
+        }
+
+        public int getNumRetries() {
+            return numRetries;
+        }
+    }
+
+    /**
+     * Helpers for metrics.
+     */
+
+    @SuppressWarnings({ "rawtypes" })
+    ConcurrentMap getMetrics() {
+        metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) 
pendingMessageRetries.size());
+        metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
+        metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+        metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+        metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
+        metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
+        metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / 
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+        metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
+                ((double) messageSizeReceived) / 
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+        return metricsMap;
+    }
+
+    void resetMetrics() {
+        messagesReceived = 0;
+        messagesEmitted = 0;
+        messageSizeReceived = 0;
+        messagesFailed = 0;
+        messageNotAvailableCount = 0;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Object getValueAndReset() {
+        ConcurrentMap metrics = getMetrics();
+        resetMetrics();
+        return metrics;
+    }
+
+    private ReaderConfigurationData<byte[]> newReaderConfiguration() {
+        ReaderConfigurationData<byte[]> readerConf = new 
ReaderConfigurationData<>();
+        readerConf.setTopicName(pulsarSpoutConf.getTopic());
+        readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
+        
readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
+        if (this.consumerConf != null) {
+            
readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction());
+            readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader());
+            readerConf.setReadCompacted(consumerConf.isReadCompacted());
+            
readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize());
+        }
+        return readerConf;
+    }
+
+    private ConsumerConfigurationData<byte[]> newConsumerConfiguration() {
+        ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != 
null ? this.consumerConf
+                : new ConsumerConfigurationData<>();
+        
consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
+        
consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+        
consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
+        return consumerConf;
+    }
+
+    static class SpoutConsumer implements PulsarSpoutConsumer {
+        private Consumer<byte[]> consumer;
+
+        SpoutConsumer(Consumer<byte[]> consumer) {
+            super();
+            this.consumer = consumer;
+        }
+
+        @Override
+        public Message<byte[]> receive(int timeout, TimeUnit unit) throws 
PulsarClientException {
+            return consumer.receive(timeout, unit);
+        }
+
+        @Override
+        public void acknowledgeAsync(Message<?> msg) {
+            consumer.acknowledgeAsync(msg);
+        }
+
+        @Override
+        public void close() throws PulsarClientException {
+            consumer.close();
+        }
+
+        @Override
+        public void unsubscribe() throws PulsarClientException {
+            consumer.unsubscribe();
+        }
+
+    }
+
+    static class SpoutReader implements PulsarSpoutConsumer {
+        private Reader<byte[]> reader;
+
+        SpoutReader(Reader<byte[]> reader) {
+            super();
+            this.reader = reader;
+        }
+
+        @Override
+        public Message<byte[]> receive(int timeout, TimeUnit unit) throws 
PulsarClientException {
+            return reader.readNext(timeout, unit);
+        }
+
+        @Override
+        public void acknowledgeAsync(Message<?> msg) {
+            // No-op
+        }
+
+        @Override
+        public void close() throws PulsarClientException {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public void unsubscribe() throws PulsarClientException {
+            // No-op
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
new file mode 100644
index 0000000..db797ee
--- /dev/null
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+/**
+ * Class used to specify pulsar spout configuration
+ *
+ *
+ */
+public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+
+    public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO = 
TimeUnit.SECONDS.toNanos(60);
+    public static final int DEFAULT_MAX_FAILED_RETRIES = -1;
+
+    private String subscriptionName = null;
+    private MessageToValuesMapper messageToValuesMapper = null;
+    private long failedRetriesTimeoutNano = 
DEFAULT_FAILED_RETRIES_TIMEOUT_NANO;
+    private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES;
+    private boolean sharedConsumerEnabled = false;
+
+    private SubscriptionType subscriptionType = SubscriptionType.Shared;
+    private boolean autoUnsubscribe = false;
+    private boolean durableSubscription = true;
+    // read position if non-durable subscription is enabled : default oldest 
message available in topic
+    private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; 
+
+    
+    /**
+     * @return the subscription name for the consumer in the spout
+     */
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    /**
+     * Sets the subscription name for the consumer in the spout
+     *
+     * @param subscriptionName
+     */
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public SubscriptionType getSubscriptionType() {
+        return subscriptionType;
+    }
+
+    public void setSubscriptionType(SubscriptionType subscriptionType) {
+        this.subscriptionType = subscriptionType;
+    }
+
+    /**
+     * @return the mapper to convert pulsar message to a storm tuple
+     */
+    public MessageToValuesMapper getMessageToValuesMapper() {
+        return messageToValuesMapper;
+    }
+
+    /**
+     * Sets the mapper to convert pulsar message to a storm tuple.
+     * <p>
+     * Note: If the mapper returns null, the message is not emitted to the 
collector and is acked immediately
+     * </p>
+     *
+     * @param mapper
+     */
+    public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
+        this.messageToValuesMapper = Objects.requireNonNull(mapper);
+    }
+
+    /**
+     *
+     * @param unit
+     * @return the timeout for retrying failed messages
+     */
+    public long getFailedRetriesTimeout(TimeUnit unit) {
+        return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS);
+    }
+
+    /**
+     * Sets the timeout within which the spout will re-inject failed messages 
with an exponential backoff <i>(default:
+     * 60 seconds)</i> Note: If set to 0, the message will not be retried when 
failed. If set to < 0, the message will
+     * be retried forever till it is successfully processed or max message 
retry count is reached, whichever comes
+     * first.
+     *
+     * @param failedRetriesTimeout
+     * @param unit
+     */
+    public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit 
unit) {
+        this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout);
+    }
+
+    /**
+     *
+     * @return the maximum number of times a failed message will be retried
+     */
+    public int getMaxFailedRetries() {
+        return maxFailedRetries;
+    }
+
+    /**
+     * Sets the maximum number of times the spout will re-inject failed 
messages with an exponential backoff
+     * <i>(default: -1)</i> Note: If set to 0, the message will not be retried 
when failed. If set to < 0, the message
+     * will be retried forever till it is successfully processed or configured 
timeout expires, whichever comes first.
+     *
+     * @param maxFailedRetries
+     */
+    public void setMaxFailedRetries(int maxFailedRetries) {
+        this.maxFailedRetries = maxFailedRetries;
+    }
+
+    /**
+     *
+     * @return if the consumer is shared across different executors of a spout
+     */
+    public boolean isSharedConsumerEnabled() {
+        return sharedConsumerEnabled;
+    }
+
+    /**
+     * Sets whether the consumer will be shared across different executors of 
a spout. <i>(default: false)</i>
+     *
+     * @param sharedConsumerEnabled
+     */
+    public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
+        this.sharedConsumerEnabled = sharedConsumerEnabled;
+    }
+    
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * It unsubscribes the subscription when spout gets closed in the topology.
+     * 
+     * @param autoUnsubscribe
+     */
+    public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+    
+    public boolean isDurableSubscription() {
+        return durableSubscription;
+    }
+
+    /**
+     * if subscription is not durable then it creates non-durable reader to 
start reading from the
+     * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in 
topic.
+     * 
+     * @param durableSubscription
+     */
+    public void setDurableSubscription(boolean durableSubscription) {
+        this.durableSubscription = durableSubscription;
+    }
+
+    public MessageId getNonDurableSubscriptionReadPosition() {
+        return nonDurableSubscriptionReadPosition;
+    }
+
+    /**
+     * Non-durable-subscription/Reader can be set to start reading from a 
specific position earliest/latest.
+     * 
+     * @param nonDurableSubscriptionReadPosition
+     */
+    public void setNonDurableSubscriptionReadPosition(MessageId 
nonDurableSubscriptionReadPosition) {
+        this.nonDurableSubscriptionReadPosition = 
nonDurableSubscriptionReadPosition;
+    }
+}
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
new file mode 100644
index 0000000..5502a62
--- /dev/null
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface PulsarSpoutConsumer {
+
+    /**
+     * Receives a single message.
+     * 
+     * @param waitTime
+     * @param unit
+     * @return
+     * @throws PulsarClientException
+     */
+    Message<byte[]> receive(int waitTime, TimeUnit unit) throws 
PulsarClientException;
+    
+    /**
+     * Ack the message async.
+     * 
+     * @param msg
+     */
+    void acknowledgeAsync(Message<?> msg);
+
+    /**
+     * unsubscribe the consumer
+     * @throws PulsarClientException 
+     */
+    void unsubscribe() throws PulsarClientException;
+    
+    /**
+     * Close the consumer
+     * 
+     * @throws PulsarClientException
+     */
+    void close() throws PulsarClientException;
+
+}
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
new file mode 100644
index 0000000..7082bf2
--- /dev/null
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.Serializable;
+
+/**
+ * Class used to specify pulsar storm configurations like service url and topic
+ *
+ *
+ */
+public class PulsarStormConfiguration implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final int DEFAULT_METRICS_TIME_INTERVAL_IN_SECS = 60;
+
+    private String serviceUrl = null;
+    private String topic = null;
+    private int metricsTimeIntervalInSecs = 
DEFAULT_METRICS_TIME_INTERVAL_IN_SECS;
+
+    /**
+     * Get service url.
+     * @return the service URL to connect to from the client.
+     */
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    /**
+     * Sets the service URL to connect to from the client.
+     *
+     * @param serviceUrl - service url
+     */
+    public void setServiceUrl(String serviceUrl) {
+        this.serviceUrl = serviceUrl;
+    }
+
+    /**
+     * Get topic.
+     * @return the topic name for the producer/consumer.
+     */
+    public String getTopic() {
+        return topic;
+    }
+
+    /**
+     * Sets the topic name for the producer/consumer. It should be of the 
format
+     * {persistent|non-persistent}://{property}/{cluster}/{namespace}/{topic}.
+     *
+     * @param topic - topic name
+     */
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * Get metrics interval.
+     * @return the time interval in seconds for metrics generation.
+     */
+    public int getMetricsTimeIntervalInSecs() {
+        return metricsTimeIntervalInSecs;
+    }
+
+    /**
+     * Sets the time interval in seconds for metrics generation <i>(default: 
60 seconds)</i>.
+     *
+     * @param metricsTimeIntervalInSecs - metrics interval in sec.
+     */
+    public void setMetricsTimeIntervalInSecs(int metricsTimeIntervalInSecs) {
+        this.metricsTimeIntervalInSecs = metricsTimeIntervalInSecs;
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
new file mode 100644
index 0000000..b000827
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * Returned by MessageToValuesMapper, this specifies the Values
+ * for an output tuple and the stream it should be sent to.
+ */
+public class PulsarTuple extends Values {
+
+    protected final String outputStream;
+
+    public PulsarTuple(String outStream, Object ... values) {
+        super(values);
+        outputStream = outStream;
+    }
+
+    /**
+     * Return stream the tuple should be emitted on.
+     *
+     * @return String
+     */
+    public String getOutputStream() {
+        return outputStream;
+    }
+}
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
new file mode 100644
index 0000000..ce7bfb9
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SharedPulsarClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SharedPulsarClient.class);
+    private static final ConcurrentMap<String, SharedPulsarClient> instances = 
new ConcurrentHashMap<>();
+
+    private final String componentId;
+    private final PulsarClientImpl client;
+    private final AtomicInteger counter = new AtomicInteger();
+
+    private Consumer<byte[]> consumer;
+    private Reader<byte[]> reader;
+    private Producer<byte[]> producer;
+
+    private SharedPulsarClient(String componentId, ClientConfigurationData 
clientConf) throws PulsarClientException {
+        this.client = new PulsarClientImpl(clientConf);
+        this.componentId = componentId;
+    }
+
+    /**
+     * Provides a shared pulsar client that is shared across all different 
tasks
+     * in the same component. Different components will not share the pulsar
+     * client since they can have different configurations.
+     *
+     * @param componentId
+     *            - the id of the spout/bolt
+     * @param clientConf
+     *            - client config
+     * @return SharedPulsarClient
+     * @throws PulsarClientException
+     *             in case of an error
+     */
+    public static SharedPulsarClient get(String componentId, 
ClientConfigurationData clientConf)
+            throws PulsarClientException {
+        AtomicReference<PulsarClientException> exception = new 
AtomicReference<PulsarClientException>();
+        instances.computeIfAbsent(componentId, pulsarClient -> {
+            SharedPulsarClient sharedPulsarClient = null;
+            try {
+                sharedPulsarClient = new SharedPulsarClient(componentId, 
clientConf);
+                LOG.info("[{}] Created a new Pulsar Client.", componentId);
+            } catch (PulsarClientException e) {
+                exception.set(e);
+            }
+            return sharedPulsarClient;
+        });
+        if (exception.get() != null) {
+            throw exception.get();
+        }
+        return instances.get(componentId);
+    }
+
+    public PulsarClientImpl getClient() {
+        counter.incrementAndGet();
+        return client;
+    }
+
+    public Consumer<byte[]> 
getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
+            throws PulsarClientException {
+        counter.incrementAndGet();
+        synchronized (this) {
+            if (consumer == null) {
+                try {
+                    consumer = client.subscribeAsync(consumerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar Consumer on {}", 
componentId, consumerConf.getSingleTopic());
+            } else {
+                LOG.info("[{}] Using a shared consumer on {}", componentId, 
consumerConf.getSingleTopic());
+            }
+        }
+        return consumer;
+    }
+
+    public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> 
readerConf) throws PulsarClientException {
+        counter.incrementAndGet();
+        synchronized (this) {
+            if (reader == null) {
+                try {
+                    reader = client.createReaderAsync(readerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar reader on {}", 
componentId, readerConf.getTopicName());
+            } else {
+                LOG.info("[{}] Using a shared reader on {}", componentId, 
readerConf.getTopicName());
+            }
+        }
+        return reader;
+    }
+
+    public Producer<byte[]> getSharedProducer(ProducerConfigurationData 
producerConf) throws PulsarClientException {
+        counter.incrementAndGet();
+        synchronized (this) {
+            if (producer == null) {
+                try {
+                    producer = client.createProducerAsync(producerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar Producer on {}", 
componentId, producerConf.getTopicName());
+            } else {
+                LOG.info("[{}] Using a shared producer on {}", componentId, 
producerConf.getTopicName());
+            }
+        }
+        return producer;
+    }
+
+    public void close() throws PulsarClientException {
+        if (counter.decrementAndGet() <= 0) {
+            if (client != null) {
+                client.close();
+                instances.remove(componentId);
+                LOG.info("[{}] Closed Pulsar Client", componentId);
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
new file mode 100644
index 0000000..452e0ce
--- /dev/null
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+
+public interface TupleToMessageMapper extends Serializable {
+
+    /**
+     * Convert tuple to {@link org.apache.pulsar.client.api.Message}.
+     *
+     * @param tuple
+     * @return
+     * @deprecated use {@link #toMessage(TypedMessageBuilder, Tuple)}
+     */
+    @Deprecated
+    default Message<byte[]> toMessage(Tuple tuple) {
+        return null;
+    }
+
+    /**
+     * Set the value on a message builder to prepare the message to be 
published from the Bolt.
+     *
+     * @param tuple
+     * @return
+     */
+    default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> 
msgBuilder, Tuple tuple) {
+        // Default implementation provided for backward compatibility
+        Message<byte[]> msg = toMessage(tuple);
+        msgBuilder.value(msg.getData())
+            .properties(msg.getProperties());
+        if (msg.hasKey()) {
+            msgBuilder.key(msg.getKey());
+        }
+        return msgBuilder;
+    }
+
+
+    /**
+     * Declare the output schema for the bolt.
+     *
+     * @param declarer
+     */
+    public void declareOutputFields(OutputFieldsDeclarer declarer);
+}
diff --git a/pulsar-storm/src/main/javadoc/overview.html 
b/pulsar-storm/src/main/javadoc/overview.html
new file mode 100644
index 0000000..a1595eb
--- /dev/null
+++ b/pulsar-storm/src/main/javadoc/overview.html
@@ -0,0 +1,29 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
+<HTML>
+  <HEAD>
+    <TITLE>Pulsar Storm API Overview</TITLE>
+  </HEAD>
+  <BODY>
+    The Pulsar Storm API is a proprietary messaging API.
+  </BODY>
+</HTML>
diff --git 
a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java 
b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
new file mode 100644
index 0000000..e6cbc51
--- /dev/null
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class PulsarSpoutTest {
+
+    @Test
+    public void testAckFailedMessage() throws Exception {
+
+        PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+        conf.setServiceUrl("http://localhost:8080";);
+        conf.setSubscriptionName("sub1");
+        conf.setTopic("persistent://prop/ns1/topic1");
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+            @Override
+            public Values toValues(Message<byte[]> msg) {
+                return null;
+            }
+
+            @Override
+            public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            }
+
+        });
+
+        ClientBuilder builder = spy(new ClientBuilderImpl());
+        PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", 
Maps.newHashMap(),
+                new byte[0], Schema.BYTES, new MessageMetadata());
+        Consumer<byte[]> consumer = mock(Consumer.class);
+        SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        future.complete(null);
+        doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
+        Field consField = PulsarSpout.class.getDeclaredField("consumer");
+        consField.setAccessible(true);
+        consField.set(spout, spoutConsumer);
+
+        spout.fail(msg);
+        spout.ack(msg);
+        spout.emitNextAvailableTuple();
+        verify(consumer, atLeast(1)).receive(anyInt(), any());
+    }
+
+    @Test
+    public void testPulsarTuple() throws Exception {
+        testPulsarSpout(true);
+    }
+
+    @Test
+    public void testPulsarSpout() throws Exception {
+        testPulsarSpout(false);
+    }
+
+    public void testPulsarSpout(boolean pulsarTuple) throws Exception {
+        PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+        conf.setServiceUrl("http://localhost:8080";);
+        conf.setSubscriptionName("sub1");
+        conf.setTopic("persistent://prop/ns1/topic1");
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        conf.setSharedConsumerEnabled(true);
+        AtomicBoolean called = new AtomicBoolean(false);
+        conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+            @Override
+            public Values toValues(Message<byte[]> msg) {
+                called.set(true);
+                if ("message to be dropped".equals(new String(msg.getData()))) 
{
+                    return null;
+                }
+                String val = new String(msg.getData());
+                if (val.startsWith("stream:")) {
+                    String stream = val.split(":")[1];
+                    return new PulsarTuple(stream, val);
+                }
+                return new Values(val);
+            }
+
+            @Override
+            public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            }
+
+        });
+
+        String msgContent = pulsarTuple ? "stream:pstream" : "test";
+
+        ClientBuilder builder = spy(new ClientBuilderImpl());
+        PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+        TopologyContext context = mock(TopologyContext.class);
+        final String componentId = "test-component-id";
+        doReturn(componentId).when(context).getThisComponentId();
+        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+        Map config = new HashMap<>();
+        Field field = SharedPulsarClient.class.getDeclaredField("instances");
+        field.setAccessible(true);
+        ConcurrentMap<String, SharedPulsarClient> instances = 
(ConcurrentMap<String, SharedPulsarClient>) field
+                .get(SharedPulsarClient.class);
+
+        SharedPulsarClient client = mock(SharedPulsarClient.class);
+        Consumer<byte[]> consumer = mock(Consumer.class);
+        when(client.getSharedConsumer(any())).thenReturn(consumer);
+        instances.put(componentId, client);
+
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", 
Maps.newHashMap(),
+                msgContent.getBytes(), Schema.BYTES, new MessageMetadata());
+        when(consumer.receive(anyInt(), any())).thenReturn(msg);
+
+        spout.open(config, context, collector);
+        spout.emitNextAvailableTuple();
+
+        assertTrue(called.get());
+        verify(consumer, atLeast(1)).receive(anyInt(), any());
+        ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
+        if (pulsarTuple) {
+            verify(collector, times(1)).emit(eq("pstream"), capt.capture(), 
eq(msg));
+        } else {
+            verify(collector, times(1)).emit(capt.capture(), eq(msg));
+        }
+        Values vals = capt.getValue();
+        assertEquals(msgContent, vals.get(0));
+    }
+
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 0844bae..18ccb15 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -33,6 +33,7 @@
   <name>Apache Pulsar Adapters :: Tests</name>
   <modules>
     <module>pulsar-kafka-compat-client-test</module>
+    <module>pulsar-storm-test</module>
     <module>pulsar-spark-test</module>
   </modules>
   <build>
diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml
new file mode 100644
index 0000000..3134328
--- /dev/null
+++ b/tests/pulsar-storm-test/pom.xml
@@ -0,0 +1,131 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar.tests</groupId>
+    <artifactId>adapters-tests-parent</artifactId>
+    <version>2.11.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-storm-test</artifactId>
+  <packaging>jar</packaging>
+  <name>Pulsar Storm adapter Tests</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-storm</artifactId>
+      <version>2.11.0-SNAPSHOT</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-client</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-server</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>buildtools</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>testmocks</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
new file mode 100644
index 0000000..4355ad6
--- /dev/null
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
+
+public class MockOutputCollector implements IOutputCollector {
+
+    private boolean acked = false;
+    private boolean failed = false;
+    private Throwable lastError = null;
+    private Tuple ackedTuple = null;
+    private int numTuplesAcked = 0;
+
+    @Override
+    public void reportError(Throwable error) {
+        lastError = error;
+    }
+
+    @Override
+    public List<Integer> emit(String streamId, Collection<Tuple> anchors, 
List<Object> tuple) {
+        return null;
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, Collection<Tuple> 
anchors, List<Object> tuple) {
+    }
+
+    @Override
+    public void ack(Tuple input) {
+        acked = true;
+        failed = false;
+        ackedTuple = input;
+        ++numTuplesAcked;
+    }
+
+    @Override
+    public void fail(Tuple input) {
+        failed = true;
+        acked = false;
+    }
+
+    @Override
+    public void resetTimeout(Tuple tuple) {
+
+    }
+
+    public boolean acked() {
+        return acked;
+    }
+
+    public boolean failed() {
+        return failed;
+    }
+
+    public Throwable getLastError() {
+        return lastError;
+    }
+
+    public Tuple getAckedTuple() {
+        return ackedTuple;
+    }
+
+    public int getNumTuplesAcked() {
+        return numTuplesAcked;
+    }
+
+    public void reset() {
+        acked = false;
+        failed = false;
+        lastError = null;
+        ackedTuple = null;
+        numTuplesAcked = 0;
+    }
+
+    @Override
+    public void flush() {
+        // Nothing to flush from buffer
+    }
+
+}
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
new file mode 100644
index 0000000..98c8d20
--- /dev/null
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pulsar.client.api.Message;
+
+import org.apache.storm.spout.ISpoutOutputCollector;
+
+public class MockSpoutOutputCollector implements ISpoutOutputCollector {
+
+    private boolean emitted = false;
+    private Message lastMessage = null;
+    private String data = null;
+
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object 
messageId) {
+        emitted = true;
+        data = (String) tuple.get(0);
+        lastMessage = (Message) messageId;
+        return new ArrayList<Integer>();
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, 
Object messageId) {
+        emitted = true;
+        data = (String) tuple.get(0);
+        lastMessage = (Message) messageId;
+    }
+
+    @Override
+    public long getPendingCount() {
+        return 0;
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+    }
+
+    public boolean emitted() {
+        return emitted;
+    }
+
+    public String getTupleData() {
+        return data;
+    }
+
+    public Message getLastMessage() {
+        return lastMessage;
+    }
+
+    public void reset() {
+        emitted = false;
+        data = null;
+        lastMessage = null;
+    }
+    
+    @Override
+    public void flush() {
+        // Nothing to flush from buffer
+    }
+}
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
new file mode 100644
index 0000000..b90e855
--- /dev/null
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.fail;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+public class PulsarBoltTest extends ProducerConsumerBase {
+
+    private static final int NO_OF_RETRIES = 10;
+
+    public String serviceUrl;
+    public final String topic = "persistent://my-property/my-ns/my-topic1";
+    public final String subscriptionName = "my-subscriber-name";
+
+    protected PulsarBoltConfiguration pulsarBoltConf;
+    protected PulsarBolt bolt;
+    protected MockOutputCollector mockCollector;
+    protected Consumer consumer;
+
+    @Override
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        super.beforeMethod(m);
+        setup();
+    }
+
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        serviceUrl = pulsar.getWebServiceAddress();
+
+        pulsarBoltConf = new PulsarBoltConfiguration();
+        pulsarBoltConf.setServiceUrl(serviceUrl);
+        pulsarBoltConf.setTopic(topic);
+        pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
+        pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
+        bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
+        mockCollector = new MockOutputCollector();
+        OutputCollector collector = new OutputCollector(mockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("test-bolt-" + 
methodName);
+        when(context.getThisTaskId()).thenReturn(0);
+        bolt.prepare(Maps.newHashMap(), context, collector);
+        consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        bolt.close();
+        consumer.close();
+        super.internalCleanup();
+    }
+
+    @SuppressWarnings("serial")
+    static TupleToMessageMapper tupleToMessageMapper = new 
TupleToMessageMapper() {
+
+        @Override
+        public TypedMessageBuilder<byte[]> 
toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
+            if ("message to be dropped".equals(new 
String(tuple.getBinary(0)))) {
+                return null;
+            }
+            if ("throw exception".equals(new String(tuple.getBinary(0)))) {
+                throw new RuntimeException();
+            }
+            return msgBuilder.value(tuple.getBinary(0));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+    };
+
+    private Tuple getMockTuple(String msgContent) {
+        Tuple mockTuple = mock(Tuple.class);
+        when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes());
+        when(mockTuple.getSourceComponent()).thenReturn("");
+        when(mockTuple.getSourceStreamId()).thenReturn("");
+        return mockTuple;
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        String msgContent = "hello world";
+        Tuple tuple = getMockTuple(msgContent);
+        bolt.execute(tuple);
+        for (int i = 0; i < NO_OF_RETRIES; i++) {
+            Thread.sleep(1000);
+            if (mockCollector.acked()) {
+                break;
+            }
+        }
+        Assert.assertTrue(mockCollector.acked());
+        Assert.assertFalse(mockCollector.failed());
+        Assert.assertNull(mockCollector.getLastError());
+        Assert.assertEquals(tuple, mockCollector.getAckedTuple());
+        Message msg = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledge(msg);
+        Assert.assertEquals(msgContent, new String(msg.getData()));
+    }
+
+    @Test
+    public void testExecuteFailure() throws Exception {
+        String msgContent = "throw exception";
+        Tuple tuple = getMockTuple(msgContent);
+        bolt.execute(tuple);
+        Assert.assertFalse(mockCollector.acked());
+        Assert.assertTrue(mockCollector.failed());
+        Assert.assertNotNull(mockCollector.getLastError());
+    }
+
+    @Test
+    public void testNoMessageSend() throws Exception {
+        String msgContent = "message to be dropped";
+        Tuple tuple = getMockTuple(msgContent);
+        bolt.execute(tuple);
+        Assert.assertTrue(mockCollector.acked());
+        Message msg = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(msg);
+    }
+
+    @Test
+    public void testMetrics() throws Exception {
+        bolt.resetMetrics();
+        String msgContent = "hello world";
+        Tuple tuple = getMockTuple(msgContent);
+        for (int i = 0; i < 10; i++) {
+            bolt.execute(tuple);
+        }
+        for (int i = 0; i < NO_OF_RETRIES; i++) {
+            Thread.sleep(1000);
+            if (mockCollector.getNumTuplesAcked() == 10) {
+                break;
+            }
+        }
+        @SuppressWarnings("rawtypes")
+        Map metrics = (Map) bolt.getValueAndReset();
+        Assert.assertEquals(((Long) 
metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10);
+        Assert.assertEquals(((Double) 
metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(),
+                10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs());
+        Assert.assertEquals(((Double) 
metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(),
+                ((double) msgContent.getBytes().length * 10) / 
pulsarBoltConf.getMetricsTimeIntervalInSecs());
+        metrics = bolt.getMetrics();
+        Assert.assertEquals(((Long) 
metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0);
+        for (int i = 0; i < 10; i++) {
+            Message msg = consumer.receive(5, TimeUnit.SECONDS);
+            consumer.acknowledge(msg);
+        }
+    }
+
+    @Test
+    public void testSharedProducer() throws Exception {
+        TopicStats topicStats = admin.topics().getStats(topic);
+        Assert.assertEquals(topicStats.getPublishers().size(), 1);
+        PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, 
PulsarClient.builder());
+        MockOutputCollector otherMockCollector = new MockOutputCollector();
+        OutputCollector collector = new OutputCollector(otherMockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("test-bolt-" + 
methodName);
+        when(context.getThisTaskId()).thenReturn(1);
+        otherBolt.prepare(Maps.newHashMap(), context, collector);
+
+        topicStats = admin.topics().getStats(topic);
+        Assert.assertEquals(topicStats.getPublishers().size(), 1);
+
+        otherBolt.close();
+
+        topicStats = admin.topics().getStats(topic);
+        Assert.assertEquals(topicStats.getPublishers().size(), 1);
+    }
+
+    @Test
+    public void testSerializability() throws Exception {
+        // test serializability with no auth
+        PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, 
PulsarClient.builder());
+        TestUtil.testSerializability(boltWithNoAuth);
+    }
+
+    @Test
+    public void testFailedProducer() {
+        PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
+        pulsarBoltConf.setServiceUrl(serviceUrl);
+        pulsarBoltConf.setTopic("persistent://invalid");
+        pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
+        pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
+        PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, 
PulsarClient.builder());
+        MockOutputCollector mockCollector = new MockOutputCollector();
+        OutputCollector collector = new OutputCollector(mockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("new" + methodName);
+        when(context.getThisTaskId()).thenReturn(0);
+        try {
+            bolt.prepare(Maps.newHashMap(), context, collector);
+            fail("should have failed as producer creation failed");
+        } catch (IllegalStateException ie) {
+            // Ok.
+        }
+    }
+}
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
new file mode 100644
index 0000000..322e41b
--- /dev/null
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+public class PulsarSpoutTest extends ProducerConsumerBase {
+
+    public String serviceUrl;
+    public final String topic = "persistent://my-property/my-ns/my-topic1";
+    public final String subscriptionName = "my-subscriber-name";
+
+    protected PulsarSpoutConfiguration pulsarSpoutConf;
+    protected PulsarSpout spout;
+    protected MockSpoutOutputCollector mockCollector;
+    protected Producer producer;
+
+    @Override
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        super.beforeMethod(m);
+        setup();
+    }
+
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        serviceUrl = pulsar.getWebServiceAddress();
+
+        pulsarSpoutConf = new PulsarSpoutConfiguration();
+        pulsarSpoutConf.setServiceUrl(serviceUrl);
+        pulsarSpoutConf.setTopic(topic);
+        pulsarSpoutConf.setSubscriptionName(subscriptionName);
+        pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
+        pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
+        pulsarSpoutConf.setMaxFailedRetries(2);
+        pulsarSpoutConf.setSharedConsumerEnabled(true);
+        pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
+        pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
+        spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
+        mockCollector = new MockSpoutOutputCollector();
+        SpoutOutputCollector collector = new 
SpoutOutputCollector(mockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("test-spout-" + 
methodName);
+        when(context.getThisTaskId()).thenReturn(0);
+        spout.open(Maps.newHashMap(), context, collector);
+        producer = pulsarClient.newProducer().topic(topic).create();
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        producer.close();
+        spout.close();
+        super.internalCleanup();
+    }
+
+    @SuppressWarnings("serial")
+    public static MessageToValuesMapper messageToValuesMapper = new 
MessageToValuesMapper() {
+
+        @Override
+        public Values toValues(Message msg) {
+            if ("message to be dropped".equals(new String(msg.getData()))) {
+                return null;
+            }
+            return new Values(new String(msg.getData()));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+    };
+
+    @Test
+    public void testBasic() throws Exception {
+        String msgContent = "hello world";
+        producer.send(msgContent.getBytes());
+        spout.nextTuple();
+        assertTrue(mockCollector.emitted());
+        assertEquals(mockCollector.getTupleData(), msgContent);
+        spout.ack(mockCollector.getLastMessage());
+    }
+
+    @Test
+    public void testRedeliverOnFail() throws Exception {
+        String msgContent = "hello world";
+        producer.send(msgContent.getBytes());
+        spout.nextTuple();
+        spout.fail(mockCollector.getLastMessage());
+        mockCollector.reset();
+        Thread.sleep(150);
+        spout.nextTuple();
+        assertTrue(mockCollector.emitted());
+        assertEquals(mockCollector.getTupleData(), msgContent);
+        spout.ack(mockCollector.getLastMessage());
+    }
+
+    @Test
+    public void testNoRedeliverOnAck() throws Exception {
+        String msgContent = "hello world";
+        producer.send(msgContent.getBytes());
+        spout.nextTuple();
+        spout.ack(mockCollector.getLastMessage());
+        mockCollector.reset();
+        spout.nextTuple();
+        assertFalse(mockCollector.emitted());
+        assertNull(mockCollector.getTupleData());
+    }
+
+    @Test
+    public void testLimitedRedeliveriesOnTimeout() throws Exception {
+        String msgContent = "chuck norris";
+        producer.send(msgContent.getBytes());
+
+        long startTime = System.currentTimeMillis();
+        while (startTime + 
pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
+                .currentTimeMillis()) {
+            mockCollector.reset();
+            spout.nextTuple();
+            assertTrue(mockCollector.emitted());
+            assertEquals(mockCollector.getTupleData(), msgContent);
+            spout.fail(mockCollector.getLastMessage());
+            // wait to avoid backoff
+            Thread.sleep(500);
+        }
+        spout.nextTuple();
+        spout.fail(mockCollector.getLastMessage());
+        mockCollector.reset();
+        Thread.sleep(500);
+        spout.nextTuple();
+        assertFalse(mockCollector.emitted());
+        assertNull(mockCollector.getTupleData());
+    }
+
+    @Test
+    public void testLimitedRedeliveriesOnCount() throws Exception {
+        String msgContent = "hello world";
+        producer.send(msgContent.getBytes());
+
+        spout.nextTuple();
+        assertTrue(mockCollector.emitted());
+        assertEquals(mockCollector.getTupleData(), msgContent);
+        spout.fail(mockCollector.getLastMessage());
+
+        mockCollector.reset();
+        Thread.sleep(150);
+
+        spout.nextTuple();
+        assertTrue(mockCollector.emitted());
+        assertEquals(mockCollector.getTupleData(), msgContent);
+        spout.fail(mockCollector.getLastMessage());
+
+        mockCollector.reset();
+        Thread.sleep(300);
+
+        spout.nextTuple();
+        assertTrue(mockCollector.emitted());
+        assertEquals(mockCollector.getTupleData(), msgContent);
+        spout.fail(mockCollector.getLastMessage());
+
+        mockCollector.reset();
+        Thread.sleep(500);
+        spout.nextTuple();
+        assertFalse(mockCollector.emitted());
+        assertNull(mockCollector.getTupleData());
+    }
+
+    @Test
+    public void testBackoffOnRetry() throws Exception {
+        String msgContent = "chuck norris";
+        producer.send(msgContent.getBytes());
+        spout.nextTuple();
+        spout.fail(mockCollector.getLastMessage());
+        mockCollector.reset();
+        // due to backoff we should not get the message again immediately
+        spout.nextTuple();
+        assertFalse(mockCollector.emitted());
+        assertNull(mockCollector.getTupleData());
+        Thread.sleep(100);
+        spout.nextTuple();
+        assertTrue(mockCollector.emitted());
+        assertEquals(mockCollector.getTupleData(), msgContent);
+        spout.ack(mockCollector.getLastMessage());
+    }
+
+    @Test
+    public void testMessageDrop() throws Exception {
+        String msgContent = "message to be dropped";
+        producer.send(msgContent.getBytes());
+        spout.nextTuple();
+        assertFalse(mockCollector.emitted());
+        assertNull(mockCollector.getTupleData());
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    @Test
+    public void testMetrics() throws Exception {
+        spout.resetMetrics();
+        String msgContent = "hello world";
+        producer.send(msgContent.getBytes());
+        spout.nextTuple();
+        Map metrics = spout.getMetrics();
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
+        assertEquals(((Double) 
metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
+                1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+        assertEquals(((Double) 
metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
+                ((double) msgContent.getBytes().length) / 
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+        spout.fail(mockCollector.getLastMessage());
+        metrics = spout.getMetrics();
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
+        Thread.sleep(150);
+        spout.nextTuple();
+        metrics = spout.getMetrics();
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
+        spout.ack(mockCollector.getLastMessage());
+        metrics = (Map) spout.getValueAndReset();
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
+        assertEquals(((Long) 
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
+    }
+
+    @Test
+    public void testSharedConsumer() throws Exception {
+        TopicStats topicStats = admin.topics().getStats(topic);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
 1);
+        PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, 
PulsarClient.builder());
+        MockSpoutOutputCollector otherMockCollector = new 
MockSpoutOutputCollector();
+        SpoutOutputCollector collector = new 
SpoutOutputCollector(otherMockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("test-spout-" + 
methodName);
+        when(context.getThisTaskId()).thenReturn(1);
+        otherSpout.open(Maps.newHashMap(), context, collector);
+
+        topicStats = admin.topics().getStats(topic);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
 1);
+
+        otherSpout.close();
+
+        topicStats = admin.topics().getStats(topic);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
 1);
+    }
+
+    @Test
+    public void testNoSharedConsumer() throws Exception {
+        TopicStats topicStats = admin.topics().getStats(topic);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
 1);
+        pulsarSpoutConf.setSharedConsumerEnabled(false);
+        PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, 
PulsarClient.builder());
+        MockSpoutOutputCollector otherMockCollector = new 
MockSpoutOutputCollector();
+        SpoutOutputCollector collector = new 
SpoutOutputCollector(otherMockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("test-spout-" + 
methodName);
+        when(context.getThisTaskId()).thenReturn(1);
+        otherSpout.open(Maps.newHashMap(), context, collector);
+
+        topicStats = admin.topics().getStats(topic);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
 2);
+
+        otherSpout.close();
+
+        topicStats = admin.topics().getStats(topic);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
 1);
+    }
+
+    @Test
+    public void testSerializability() throws Exception {
+        // test serializability with no auth
+        PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, 
PulsarClient.builder());
+        TestUtil.testSerializability(spoutWithNoAuth);
+    }
+
+    @Test
+    public void testFailedConsumer() {
+        PulsarSpoutConfiguration pulsarSpoutConf = new 
PulsarSpoutConfiguration();
+        pulsarSpoutConf.setServiceUrl(serviceUrl);
+        pulsarSpoutConf.setTopic("persistent://invalidTopic");
+        pulsarSpoutConf.setSubscriptionName(subscriptionName);
+        pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
+        pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
+        pulsarSpoutConf.setMaxFailedRetries(2);
+        pulsarSpoutConf.setSharedConsumerEnabled(false);
+        pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
+        pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
+        PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, 
PulsarClient.builder());
+        MockSpoutOutputCollector mockCollector = new 
MockSpoutOutputCollector();
+        SpoutOutputCollector collector = new 
SpoutOutputCollector(mockCollector);
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getThisComponentId()).thenReturn("new-test" + methodName);
+        when(context.getThisTaskId()).thenReturn(0);
+        try {
+            spout.open(Maps.newHashMap(), context, collector);
+            fail("should have failed as consumer creation failed");
+        } catch (IllegalStateException e) {
+            // Ok
+        }
+    }
+}
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
new file mode 100644
index 0000000..a71e088
--- /dev/null
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import org.testng.Assert;
+
+public class TestUtil {
+
+    public static void testSerializability(Object object) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(object);
+        oos.close();
+        Assert.assertTrue(out.toByteArray().length > 0);
+    }
+}
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
new file mode 100644
index 0000000..93404ea
--- /dev/null
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm.example;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.storm.MessageToValuesMapper;
+import org.apache.pulsar.storm.PulsarBolt;
+import org.apache.pulsar.storm.PulsarBoltConfiguration;
+import org.apache.pulsar.storm.PulsarSpout;
+import org.apache.pulsar.storm.PulsarSpoutConfiguration;
+import org.apache.pulsar.storm.TupleToMessageMapper;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormExample {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSpout.class);
+    private static final String serviceUrl = 
"http://broker-pdev.messaging.corp.usw.example.com:8080";;
+
+    @SuppressWarnings("serial")
+    static MessageToValuesMapper messageToValuesMapper = new 
MessageToValuesMapper() {
+
+        @Override
+        public Values toValues(Message msg) {
+            return new Values(new String(msg.getData()));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            // declare the output fields
+            declarer.declare(new Fields("string"));
+        }
+    };
+
+    @SuppressWarnings("serial")
+    static TupleToMessageMapper tupleToMessageMapper = new 
TupleToMessageMapper() {
+
+        @Override
+        public TypedMessageBuilder<byte[]> 
toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
+            String receivedMessage = tuple.getString(0);
+            // message processing
+            String processedMsg = receivedMessage + "-processed";
+            return msgBuilder.value(processedMsg.getBytes());
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            // declare the output fields
+        }
+    };
+
+    public static void main(String[] args) throws Exception {
+        // String authPluginClassName = 
"org.apache.pulsar.client.impl.auth.MyAuthentication";
+        // String authParams = "key1:val1,key2:val2";
+        // clientConf.setAuthentication(authPluginClassName, authParams);
+
+        String topic1 = "persistent://my-property/use/my-ns/my-topic1";
+        String topic2 = "persistent://my-property/use/my-ns/my-topic2";
+        String subscriptionName1 = "my-subscriber-name1";
+        String subscriptionName2 = "my-subscriber-name2";
+
+        // create spout
+        PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
+        spoutConf.setServiceUrl(serviceUrl);
+        spoutConf.setTopic(topic1);
+        spoutConf.setSubscriptionName(subscriptionName1);
+        spoutConf.setMessageToValuesMapper(messageToValuesMapper);
+        PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder());
+
+        // create bolt
+        PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
+        boltConf.setServiceUrl(serviceUrl);
+        boltConf.setTopic(topic2);
+        boltConf.setTupleToMessageMapper(tupleToMessageMapper);
+        PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder());
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("testSpout", spout);
+        builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout");
+
+        Config conf = new Config();
+        conf.setNumWorkers(2);
+        conf.setDebug(true);
+        conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("test", conf, builder.createTopology());
+        Utils.sleep(10000);
+
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+        // create a consumer on topic2 to receive messages from the bolt when 
the processing is done
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe();
+        // create a producer on topic1 to send messages that will be received 
by the spout
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic1).create();
+
+        for (int i = 0; i < 10; i++) {
+            String msg = "msg-" + i;
+            producer.send(msg.getBytes());
+            LOG.info("Message {} sent", msg);
+        }
+        Message<byte[]> msg = null;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+            LOG.info("Message {} received", new String(msg.getData()));
+        }
+        cluster.killTopology("test");
+        cluster.shutdown();
+
+    }
+
+    class PulsarMetricsConsumer implements IMetricsConsumer {
+
+        @Override
+        public void prepare(Map stormConf, Object registrationArgument, 
TopologyContext context,
+                IErrorReporter errorReporter) {
+        }
+
+        @Override
+        public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> 
dataPoints) {
+            // The collection will contain metrics for all the spouts/bolts 
that register the metrics in the topology.
+            // The name for the Pulsar Spout is 
"PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt
+            // is
+            // "PulsarBoltMetrics-{componentId}-{taskIndex}".
+        }
+
+        @Override
+        public void cleanup() {
+        }
+
+    }
+}

Reply via email to