This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push: new 5aeddbb Add Strom adapter back after removing from Apache Storm repo (#55) 5aeddbb is described below commit 5aeddbb21508fa70d36199f95dabebe17774384c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Nov 1 10:24:57 2023 -0700 Add Strom adapter back after removing from Apache Storm repo (#55) This reverts commit ab537c13061b06de2044cd43965de930d762fe8a. --- .asf.yaml | 3 +- README.md | 4 +- pom.xml | 19 + pulsar-storm/pom.xml | 102 +++++ .../apache/pulsar/storm/MessageToValuesMapper.java | 44 ++ .../java/org/apache/pulsar/storm/PulsarBolt.java | 207 +++++++++ .../pulsar/storm/PulsarBoltConfiguration.java | 57 +++ .../java/org/apache/pulsar/storm/PulsarSpout.java | 494 +++++++++++++++++++++ .../pulsar/storm/PulsarSpoutConfiguration.java | 195 ++++++++ .../apache/pulsar/storm/PulsarSpoutConsumer.java | 58 +++ .../pulsar/storm/PulsarStormConfiguration.java | 90 ++++ .../java/org/apache/pulsar/storm/PulsarTuple.java | 45 ++ .../apache/pulsar/storm/SharedPulsarClient.java | 155 +++++++ .../apache/pulsar/storm/TupleToMessageMapper.java | 66 +++ pulsar-storm/src/main/javadoc/overview.html | 29 ++ .../org/apache/pulsar/storm/PulsarSpoutTest.java | 178 ++++++++ tests/pom.xml | 1 + tests/pulsar-storm-test/pom.xml | 131 ++++++ .../apache/pulsar/storm/MockOutputCollector.java | 101 +++++ .../pulsar/storm/MockSpoutOutputCollector.java | 80 ++++ .../org/apache/pulsar/storm/PulsarBoltTest.java | 236 ++++++++++ .../org/apache/pulsar/storm/PulsarSpoutTest.java | 349 +++++++++++++++ .../java/org/apache/pulsar/storm/TestUtil.java | 35 ++ .../apache/pulsar/storm/example/StormExample.java | 166 +++++++ 24 files changed, 2841 insertions(+), 4 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index c04e151..74589a2 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -27,6 +27,7 @@ github: - streaming - queuing - event-streaming + - apache-storm - apache-spark - apache-kafka features: @@ -47,4 +48,4 @@ github: notifications: commits: commits@pulsar.apache.org issues: commits@pulsar.apache.org - pullrequests: commits@pulsar.apache.org + pullrequests: commits@pulsar.apache.org \ No newline at end of file diff --git a/README.md b/README.md index 33409df..ce62338 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,6 @@ This repository is used for hosting all the adapters maintained and supported by [Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is supported and maintained by Apache Flink Community. -[Apache Storm bolt and spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are supported by Apache Storm Community. - ## Building In order to build this code you can simply use Maven @@ -44,5 +42,5 @@ git checkout v2.11.0 mvn clean install -DskipTests ``` -This is because this repository depends on test integration artifacts of the relative version on the main +This is because this repository depends on test integration artifacts of the relative version on the main Apache Pulsar codebase diff --git a/pom.xml b/pom.xml index 5f39f48..4f240d1 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,7 @@ <properties> <pulsar.version>2.11.0</pulsar.version> <kafka-client.version>2.7.2</kafka-client.version> + <storm.version>2.0.0</storm.version> <kafka_0_8.version>0.8.1.1</kafka_0_8.version> <avro.version>1.10.2</avro.version> <log4j.version>1.2.17</log4j.version> @@ -139,6 +140,7 @@ </properties> <modules> + <module>pulsar-storm</module> <module>pulsar-spark</module> <module>pulsar-client-kafka-compat</module> <module>pulsar-log4j2-appender</module> @@ -250,6 +252,22 @@ </exclusions> </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-client</artifactId> + <version>${storm.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-server</artifactId> + <version>${storm.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> @@ -1068,3 +1086,4 @@ </repositories> </project> + diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml new file mode 100644 index 0000000..a20649b --- /dev/null +++ b/pulsar-storm/pom.xml @@ -0,0 +1,102 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-adapters</artifactId> + <version>2.11.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>pulsar-storm</artifactId> + <name>Pulsar Storm adapter</name> + + <dependencies> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-common</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.yaml</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-client</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + </build> +</project> diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java new file mode 100644 index 0000000..92e127c --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.io.Serializable; + +import org.apache.pulsar.client.api.Message; + +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; + +public interface MessageToValuesMapper extends Serializable { + + /** + * Convert {@link org.apache.pulsar.client.api.Message} to tuple values. + * + * @param msg + * @return + */ + Values toValues(Message<byte[]> msg); + + /** + * Declare the output schema for the spout. + * + * @param declarer + */ + void declareOutputFields(OutputFieldsDeclarer declarer); +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java new file mode 100644 index 0000000..32fa78f --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import static java.lang.String.format; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkNotNull; + +public class PulsarBolt extends BaseRichBolt implements IMetric { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PulsarBolt.class); + + public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent"; + public static final String PRODUCER_RATE = "producerRate"; + public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput"; + + private final ClientConfigurationData clientConf; + private final ProducerConfigurationData producerConf; + private final PulsarBoltConfiguration pulsarBoltConf; + private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>(); + + private SharedPulsarClient sharedPulsarClient; + private String componentId; + private String boltId; + private OutputCollector collector; + private Producer<byte[]> producer; + private volatile long messagesSent = 0; + private volatile long messageSizeSent = 0; + + public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) { + this(pulsarBoltConf, PulsarClient.builder()); + } + + public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) { + this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(), + new ProducerConfigurationData()); + } + + public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf, + ProducerConfigurationData producerConf) { + checkNotNull(pulsarBoltConf, "bolt configuration can't be null"); + checkNotNull(clientConf, "client configuration can't be null"); + checkNotNull(producerConf, "producer configuration can't be null"); + Objects.requireNonNull(pulsarBoltConf.getServiceUrl()); + Objects.requireNonNull(pulsarBoltConf.getTopic()); + Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper()); + this.pulsarBoltConf = pulsarBoltConf; + this.clientConf = clientConf; + this.producerConf = producerConf; + this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl()); + this.producerConf.setTopicName(pulsarBoltConf.getTopic()); + this.producerConf.setBatcherBuilder(null); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + this.componentId = context.getThisComponentId(); + this.boltId = String.format("%s-%s", componentId, context.getThisTaskId()); + this.collector = collector; + try { + sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); + producer = sharedPulsarClient.getSharedProducer(producerConf); + LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic()); + } catch (PulsarClientException e) { + LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e); + throw new IllegalStateException( + format("Failed to initialize producer for %s : %s", pulsarBoltConf.getTopic(), e.getMessage()), e); + } + context.registerMetric(String.format("PulsarBoltMetrics-%s-%s", componentId, context.getThisTaskIndex()), this, + pulsarBoltConf.getMetricsTimeIntervalInSecs()); + } + + @Override + public void execute(Tuple input) { + if (TupleUtils.isTick(input)) { + collector.ack(input); + return; + } + try { + if (producer != null) { + // a message key can be provided in the mapper + TypedMessageBuilder<byte[]> msgBuilder = pulsarBoltConf.getTupleToMessageMapper() + .toMessage(producer.newMessage(), input); + if (msgBuilder == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Cannot send null message, acking the collector", boltId); + } + collector.ack(input); + } else { + final long messageSizeToBeSent = ((TypedMessageBuilderImpl<byte[]>) msgBuilder).getContent() + .remaining(); + msgBuilder.sendAsync().handle((msgId, ex) -> { + synchronized (collector) { + if (ex != null) { + collector.reportError(ex); + collector.fail(input); + LOG.error("[{}] Message send failed", boltId, ex); + + } else { + collector.ack(input); + ++messagesSent; + messageSizeSent += messageSizeToBeSent; + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Message sent with id {}", boltId, msgId); + } + } + } + + return null; + }); + } + } + } catch (Exception e) { + LOG.error("[{}] Message processing failed", boltId, e); + collector.reportError(e); + collector.fail(input); + } + } + + public void close() { + try { + LOG.info("[{}] Closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic()); + if (sharedPulsarClient != null) { + sharedPulsarClient.close(); + } + } catch (PulsarClientException e) { + LOG.error("[{}] Error closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e); + } + } + + @Override + public void cleanup() { + close(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer); + } + + /** + * Helpers for metrics. + */ + + @SuppressWarnings({ "rawtypes" }) + ConcurrentMap getMetrics() { + metricsMap.put(NO_OF_MESSAGES_SENT, messagesSent); + metricsMap.put(PRODUCER_RATE, ((double) messagesSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs()); + metricsMap.put(PRODUCER_THROUGHPUT_BYTES, + ((double) messageSizeSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs()); + return metricsMap; + } + + void resetMetrics() { + messagesSent = 0; + messageSizeSent = 0; + } + + @SuppressWarnings("rawtypes") + @Override + public Object getValueAndReset() { + ConcurrentMap metrics = getMetrics(); + resetMetrics(); + return metrics; + } +} \ No newline at end of file diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java new file mode 100644 index 0000000..714e435 --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.Objects; + +/** + * Class used to specify Pulsar bolt configuration + * + * + */ +public class PulsarBoltConfiguration extends PulsarStormConfiguration { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private TupleToMessageMapper tupleToMessageMapper = null; + + /** + * @return the mapper to convert storm tuples to a pulsar message + */ + public TupleToMessageMapper getTupleToMessageMapper() { + return tupleToMessageMapper; + } + + /** + * Sets the mapper to convert storm tuples to a pulsar message + * <p> + * Note: If the mapper returns null, the message is not sent by the producer and is acked immediately on the + * collector + * </p> + * + * @param mapper + */ + public void setTupleToMessageMapper(TupleToMessageMapper mapper) { + this.tupleToMessageMapper = Objects.requireNonNull(mapper); + } + +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java new file mode 100644 index 0000000..8ed090e --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -0,0 +1,494 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import static java.lang.String.format; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarSpout extends BaseRichSpout implements IMetric { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class); + + public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages"; + public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived"; + public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted"; + public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed"; + public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount"; + public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks"; + public static final String CONSUMER_RATE = "consumerRate"; + public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput"; + + private final ClientConfigurationData clientConf; + private final PulsarSpoutConfiguration pulsarSpoutConf; + private final ConsumerConfigurationData<byte[]> consumerConf; + private final long failedRetriesTimeoutNano; + private final int maxFailedRetries; + private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>(); + private final Queue<Message<byte[]>> failedMessages = new ConcurrentLinkedQueue<>(); + private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>(); + + private SharedPulsarClient sharedPulsarClient; + private String componentId; + private String spoutId; + private SpoutOutputCollector collector; + private PulsarSpoutConsumer consumer; + private volatile long messagesReceived = 0; + private volatile long messagesEmitted = 0; + private volatile long messagesFailed = 0; + private volatile long messageNotAvailableCount = 0; + private volatile long pendingAcks = 0; + private volatile long messageSizeReceived = 0; + + public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) { + this(pulsarSpoutConf, PulsarClient.builder()); + } + + public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) { + this(pulsarSpoutConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(), + new ConsumerConfigurationData<byte[]>()); + } + + public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfigurationData clientConfig, + ConsumerConfigurationData<byte[]> consumerConfig) { + Objects.requireNonNull(pulsarSpoutConf.getServiceUrl()); + Objects.requireNonNull(pulsarSpoutConf.getTopic()); + Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName()); + Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper()); + + checkNotNull(pulsarSpoutConf, "spout configuration can't be null"); + checkNotNull(clientConfig, "client configuration can't be null"); + checkNotNull(consumerConfig, "consumer configuration can't be null"); + this.clientConf = clientConfig; + this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); + this.consumerConf = consumerConfig; + this.pulsarSpoutConf = pulsarSpoutConf; + this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS); + this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries(); + } + + @Override + public void close() { + try { + LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic()); + + if (pulsarSpoutConf.isAutoUnsubscribe()) { + try { + consumer.unsubscribe(); + } catch (PulsarClientException e) { + LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId, + this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e); + } + } + + if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) { + consumer.close(); + } + if (sharedPulsarClient != null) { + sharedPulsarClient.close(); + } + pendingMessageRetries.clear(); + failedMessages.clear(); + } catch (PulsarClientException e) { + LOG.error("[{}] Error closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic(), e); + } + } + + @Override + public void ack(Object msgId) { + if (msgId instanceof Message) { + Message<?> msg = (Message<?>) msgId; + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId()); + } + consumer.acknowledgeAsync(msg); + pendingMessageRetries.remove(msg.getMessageId()); + // we should also remove message from failedMessages but it will be + // eventually removed while emitting next + // tuple + --pendingAcks; + } + } + + @Override + public void fail(Object msgId) { + if (msgId instanceof Message) { + @SuppressWarnings("unchecked") + Message<byte[]> msg = (Message<byte[]>) msgId; + MessageId id = msg.getMessageId(); + LOG.warn("[{}] Error processing message {}", spoutId, id); + + // Since the message processing failed, we put it in the failed + // messages queue if there are more retries + // remaining for the message + MessageRetries messageRetries = pendingMessageRetries.computeIfAbsent(id, (k) -> new MessageRetries()); + if ((failedRetriesTimeoutNano < 0 + || (messageRetries.getTimeStamp() + failedRetriesTimeoutNano) > System.nanoTime()) + && (maxFailedRetries < 0 || messageRetries.numRetries < maxFailedRetries)) { + // since we can retry again, we increment retry count and put it + // in the queue + LOG.info("[{}] Putting message {} in the retry queue", spoutId, id); + messageRetries.incrementAndGet(); + pendingMessageRetries.putIfAbsent(id, messageRetries); + failedMessages.add(msg); + --pendingAcks; + messagesFailed++; + } else { + LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id); + ack(msg); + } + } + + } + + /** + * Emits a tuple received from the Pulsar consumer unless there are any + * failed messages. + */ + @Override + public void nextTuple() { + emitNextAvailableTuple(); + } + + /** + * It makes sure that it emits next available non-tuple to topology unless + * consumer queue doesn't have any message available. It receives message + * from consumer queue and converts it to tuple and emits to topology. if + * the converted tuple is null then it tries to receives next message and + * perform the same until it finds non-tuple to emit. + */ + public void emitNextAvailableTuple() { + // check if there are any failed messages to re-emit in the topology + if (emitFailedMessage()) { + return; + } + + Message<byte[]> msg; + // receive from consumer if no failed messages + if (consumer != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId); + } + try { + boolean done = false; + while (!done) { + msg = consumer.receive(100, TimeUnit.MILLISECONDS); + if (msg != null) { + ++messagesReceived; + messageSizeReceived += msg.getData().length; + done = mapToValueAndEmit(msg); + } else { + // queue is empty and nothing to emit + done = true; + messageNotAvailableCount++; + } + } + } catch (PulsarClientException e) { + LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e); + } + } + } + + private boolean emitFailedMessage() { + Message<byte[]> msg; + + while ((msg = failedMessages.peek()) != null) { + MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId()); + if (messageRetries != null) { + // emit the tuple if retry doesn't need backoff else sleep with + // backoff time and return without doing + // anything + if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS, + messageRetries.getNumRetries(), clientConf.getInitialBackoffIntervalNanos(), + clientConf.getMaxBackoffIntervalNanos())) { + Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos())); + } else { + // remove the message from the queue and emit to the + // topology, only if it should not be backedoff + LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId()); + failedMessages.remove(); + mapToValueAndEmit(msg); + } + return true; + } + + // messageRetries is null because messageRetries is already acked + // and removed from pendingMessageRetries + // then remove it from failed message queue as well. + if (LOG.isDebugEnabled()) { + LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked", + pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId()); + } + failedMessages.remove(); + // try to find out next failed message + continue; + } + return false; + } + + @Override + @SuppressWarnings({ "rawtypes" }) + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.componentId = context.getThisComponentId(); + this.spoutId = String.format("%s-%s", componentId, context.getThisTaskId()); + this.collector = collector; + pendingMessageRetries.clear(); + failedMessages.clear(); + try { + consumer = createConsumer(); + LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId, + pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName()); + } catch (PulsarClientException e) { + LOG.error("[{}] Error creating pulsar consumer on topic {}", spoutId, pulsarSpoutConf.getTopic(), e); + throw new IllegalStateException(format("Failed to initialize consumer for %s-%s : %s", + pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName(), e.getMessage()), e); + } + context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", componentId, context.getThisTaskIndex()), this, + pulsarSpoutConf.getMetricsTimeIntervalInSecs()); + } + + private PulsarSpoutConsumer createConsumer() throws PulsarClientException { + sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); + PulsarSpoutConsumer consumer; + if (pulsarSpoutConf.isSharedConsumerEnabled()) { + consumer = pulsarSpoutConf.isDurableSubscription() + ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration())) + : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration())); + } else { + try { + consumer = pulsarSpoutConf.isDurableSubscription() + ? new SpoutConsumer( + sharedPulsarClient.getClient().subscribeAsync(newConsumerConfiguration()).join()) + : new SpoutReader( + sharedPulsarClient.getClient().createReaderAsync(newReaderConfiguration()).join()); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + } + return consumer; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer); + + } + + private boolean mapToValueAndEmit(Message<byte[]> msg) { + if (msg != null) { + Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); + ++pendingAcks; + if (values == null) { + // since the mapper returned null, we can drop the message and + // ack it immediately + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Dropping message {}", spoutId, msg.getMessageId()); + } + ack(msg); + } else { + if (values instanceof PulsarTuple) { + collector.emit(((PulsarTuple) values).getOutputStream(), values, msg); + } else { + collector.emit(values, msg); + } + ++messagesEmitted; + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId()); + } + return true; + } + } + return false; + } + + public class MessageRetries { + private final long timestampInNano; + private int numRetries; + + public MessageRetries() { + this.timestampInNano = System.nanoTime(); + this.numRetries = 0; + } + + public long getTimeStamp() { + return timestampInNano; + } + + public int incrementAndGet() { + return ++numRetries; + } + + public int getNumRetries() { + return numRetries; + } + } + + /** + * Helpers for metrics. + */ + + @SuppressWarnings({ "rawtypes" }) + ConcurrentMap getMetrics() { + metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size()); + metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived); + metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted); + metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed); + metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount); + metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks); + metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs()); + metricsMap.put(CONSUMER_THROUGHPUT_BYTES, + ((double) messageSizeReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs()); + return metricsMap; + } + + void resetMetrics() { + messagesReceived = 0; + messagesEmitted = 0; + messageSizeReceived = 0; + messagesFailed = 0; + messageNotAvailableCount = 0; + } + + @SuppressWarnings("rawtypes") + @Override + public Object getValueAndReset() { + ConcurrentMap metrics = getMetrics(); + resetMetrics(); + return metrics; + } + + private ReaderConfigurationData<byte[]> newReaderConfiguration() { + ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<>(); + readerConf.setTopicName(pulsarSpoutConf.getTopic()); + readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName()); + readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition()); + if (this.consumerConf != null) { + readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction()); + readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader()); + readerConf.setReadCompacted(consumerConf.isReadCompacted()); + readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize()); + } + return readerConf; + } + + private ConsumerConfigurationData<byte[]> newConsumerConfiguration() { + ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != null ? this.consumerConf + : new ConsumerConfigurationData<>(); + consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); + consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); + consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType()); + return consumerConf; + } + + static class SpoutConsumer implements PulsarSpoutConsumer { + private Consumer<byte[]> consumer; + + SpoutConsumer(Consumer<byte[]> consumer) { + super(); + this.consumer = consumer; + } + + @Override + public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException { + return consumer.receive(timeout, unit); + } + + @Override + public void acknowledgeAsync(Message<?> msg) { + consumer.acknowledgeAsync(msg); + } + + @Override + public void close() throws PulsarClientException { + consumer.close(); + } + + @Override + public void unsubscribe() throws PulsarClientException { + consumer.unsubscribe(); + } + + } + + static class SpoutReader implements PulsarSpoutConsumer { + private Reader<byte[]> reader; + + SpoutReader(Reader<byte[]> reader) { + super(); + this.reader = reader; + } + + @Override + public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException { + return reader.readNext(timeout, unit); + } + + @Override + public void acknowledgeAsync(Message<?> msg) { + // No-op + } + + @Override + public void close() throws PulsarClientException { + try { + reader.close(); + } catch (IOException e) { + throw new PulsarClientException(e); + } + } + + @Override + public void unsubscribe() throws PulsarClientException { + // No-op + } + } +} \ No newline at end of file diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java new file mode 100644 index 0000000..db797ee --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionType; + +/** + * Class used to specify pulsar spout configuration + * + * + */ +public class PulsarSpoutConfiguration extends PulsarStormConfiguration { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO = TimeUnit.SECONDS.toNanos(60); + public static final int DEFAULT_MAX_FAILED_RETRIES = -1; + + private String subscriptionName = null; + private MessageToValuesMapper messageToValuesMapper = null; + private long failedRetriesTimeoutNano = DEFAULT_FAILED_RETRIES_TIMEOUT_NANO; + private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES; + private boolean sharedConsumerEnabled = false; + + private SubscriptionType subscriptionType = SubscriptionType.Shared; + private boolean autoUnsubscribe = false; + private boolean durableSubscription = true; + // read position if non-durable subscription is enabled : default oldest message available in topic + private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; + + + /** + * @return the subscription name for the consumer in the spout + */ + public String getSubscriptionName() { + return subscriptionName; + } + + /** + * Sets the subscription name for the consumer in the spout + * + * @param subscriptionName + */ + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public SubscriptionType getSubscriptionType() { + return subscriptionType; + } + + public void setSubscriptionType(SubscriptionType subscriptionType) { + this.subscriptionType = subscriptionType; + } + + /** + * @return the mapper to convert pulsar message to a storm tuple + */ + public MessageToValuesMapper getMessageToValuesMapper() { + return messageToValuesMapper; + } + + /** + * Sets the mapper to convert pulsar message to a storm tuple. + * <p> + * Note: If the mapper returns null, the message is not emitted to the collector and is acked immediately + * </p> + * + * @param mapper + */ + public void setMessageToValuesMapper(MessageToValuesMapper mapper) { + this.messageToValuesMapper = Objects.requireNonNull(mapper); + } + + /** + * + * @param unit + * @return the timeout for retrying failed messages + */ + public long getFailedRetriesTimeout(TimeUnit unit) { + return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS); + } + + /** + * Sets the timeout within which the spout will re-inject failed messages with an exponential backoff <i>(default: + * 60 seconds)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message will + * be retried forever till it is successfully processed or max message retry count is reached, whichever comes + * first. + * + * @param failedRetriesTimeout + * @param unit + */ + public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit unit) { + this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout); + } + + /** + * + * @return the maximum number of times a failed message will be retried + */ + public int getMaxFailedRetries() { + return maxFailedRetries; + } + + /** + * Sets the maximum number of times the spout will re-inject failed messages with an exponential backoff + * <i>(default: -1)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message + * will be retried forever till it is successfully processed or configured timeout expires, whichever comes first. + * + * @param maxFailedRetries + */ + public void setMaxFailedRetries(int maxFailedRetries) { + this.maxFailedRetries = maxFailedRetries; + } + + /** + * + * @return if the consumer is shared across different executors of a spout + */ + public boolean isSharedConsumerEnabled() { + return sharedConsumerEnabled; + } + + /** + * Sets whether the consumer will be shared across different executors of a spout. <i>(default: false)</i> + * + * @param sharedConsumerEnabled + */ + public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) { + this.sharedConsumerEnabled = sharedConsumerEnabled; + } + + public boolean isAutoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * It unsubscribes the subscription when spout gets closed in the topology. + * + * @param autoUnsubscribe + */ + public void setAutoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + } + + public boolean isDurableSubscription() { + return durableSubscription; + } + + /** + * if subscription is not durable then it creates non-durable reader to start reading from the + * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic. + * + * @param durableSubscription + */ + public void setDurableSubscription(boolean durableSubscription) { + this.durableSubscription = durableSubscription; + } + + public MessageId getNonDurableSubscriptionReadPosition() { + return nonDurableSubscriptionReadPosition; + } + + /** + * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest. + * + * @param nonDurableSubscriptionReadPosition + */ + public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) { + this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition; + } +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java new file mode 100644 index 0000000..5502a62 --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +public interface PulsarSpoutConsumer { + + /** + * Receives a single message. + * + * @param waitTime + * @param unit + * @return + * @throws PulsarClientException + */ + Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException; + + /** + * Ack the message async. + * + * @param msg + */ + void acknowledgeAsync(Message<?> msg); + + /** + * unsubscribe the consumer + * @throws PulsarClientException + */ + void unsubscribe() throws PulsarClientException; + + /** + * Close the consumer + * + * @throws PulsarClientException + */ + void close() throws PulsarClientException; + +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java new file mode 100644 index 0000000..7082bf2 --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.io.Serializable; + +/** + * Class used to specify pulsar storm configurations like service url and topic + * + * + */ +public class PulsarStormConfiguration implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final int DEFAULT_METRICS_TIME_INTERVAL_IN_SECS = 60; + + private String serviceUrl = null; + private String topic = null; + private int metricsTimeIntervalInSecs = DEFAULT_METRICS_TIME_INTERVAL_IN_SECS; + + /** + * Get service url. + * @return the service URL to connect to from the client. + */ + public String getServiceUrl() { + return serviceUrl; + } + + /** + * Sets the service URL to connect to from the client. + * + * @param serviceUrl - service url + */ + public void setServiceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + } + + /** + * Get topic. + * @return the topic name for the producer/consumer. + */ + public String getTopic() { + return topic; + } + + /** + * Sets the topic name for the producer/consumer. It should be of the format + * {persistent|non-persistent}://{property}/{cluster}/{namespace}/{topic}. + * + * @param topic - topic name + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * Get metrics interval. + * @return the time interval in seconds for metrics generation. + */ + public int getMetricsTimeIntervalInSecs() { + return metricsTimeIntervalInSecs; + } + + /** + * Sets the time interval in seconds for metrics generation <i>(default: 60 seconds)</i>. + * + * @param metricsTimeIntervalInSecs - metrics interval in sec. + */ + public void setMetricsTimeIntervalInSecs(int metricsTimeIntervalInSecs) { + this.metricsTimeIntervalInSecs = metricsTimeIntervalInSecs; + } + +} \ No newline at end of file diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java new file mode 100644 index 0000000..b000827 --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + + +import org.apache.storm.tuple.Values; + +/** + * Returned by MessageToValuesMapper, this specifies the Values + * for an output tuple and the stream it should be sent to. + */ +public class PulsarTuple extends Values { + + protected final String outputStream; + + public PulsarTuple(String outStream, Object ... values) { + super(values); + outputStream = outStream; + } + + /** + * Return stream the tuple should be emitted on. + * + * @return String + */ + public String getOutputStream() { + return outputStream; + } +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java new file mode 100644 index 0000000..ce7bfb9 --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedPulsarClient { + private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class); + private static final ConcurrentMap<String, SharedPulsarClient> instances = new ConcurrentHashMap<>(); + + private final String componentId; + private final PulsarClientImpl client; + private final AtomicInteger counter = new AtomicInteger(); + + private Consumer<byte[]> consumer; + private Reader<byte[]> reader; + private Producer<byte[]> producer; + + private SharedPulsarClient(String componentId, ClientConfigurationData clientConf) throws PulsarClientException { + this.client = new PulsarClientImpl(clientConf); + this.componentId = componentId; + } + + /** + * Provides a shared pulsar client that is shared across all different tasks + * in the same component. Different components will not share the pulsar + * client since they can have different configurations. + * + * @param componentId + * - the id of the spout/bolt + * @param clientConf + * - client config + * @return SharedPulsarClient + * @throws PulsarClientException + * in case of an error + */ + public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf) + throws PulsarClientException { + AtomicReference<PulsarClientException> exception = new AtomicReference<PulsarClientException>(); + instances.computeIfAbsent(componentId, pulsarClient -> { + SharedPulsarClient sharedPulsarClient = null; + try { + sharedPulsarClient = new SharedPulsarClient(componentId, clientConf); + LOG.info("[{}] Created a new Pulsar Client.", componentId); + } catch (PulsarClientException e) { + exception.set(e); + } + return sharedPulsarClient; + }); + if (exception.get() != null) { + throw exception.get(); + } + return instances.get(componentId); + } + + public PulsarClientImpl getClient() { + counter.incrementAndGet(); + return client; + } + + public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf) + throws PulsarClientException { + counter.incrementAndGet(); + synchronized (this) { + if (consumer == null) { + try { + consumer = client.subscribeAsync(consumerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic()); + } else { + LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic()); + } + } + return consumer; + } + + public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException { + counter.incrementAndGet(); + synchronized (this) { + if (reader == null) { + try { + reader = client.createReaderAsync(readerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName()); + } else { + LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName()); + } + } + return reader; + } + + public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException { + counter.incrementAndGet(); + synchronized (this) { + if (producer == null) { + try { + producer = client.createProducerAsync(producerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName()); + } else { + LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName()); + } + } + return producer; + } + + public void close() throws PulsarClientException { + if (counter.decrementAndGet() <= 0) { + if (client != null) { + client.close(); + instances.remove(componentId); + LOG.info("[{}] Closed Pulsar Client", componentId); + } + } + } + +} \ No newline at end of file diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java new file mode 100644 index 0000000..452e0ce --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.io.Serializable; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; + +public interface TupleToMessageMapper extends Serializable { + + /** + * Convert tuple to {@link org.apache.pulsar.client.api.Message}. + * + * @param tuple + * @return + * @deprecated use {@link #toMessage(TypedMessageBuilder, Tuple)} + */ + @Deprecated + default Message<byte[]> toMessage(Tuple tuple) { + return null; + } + + /** + * Set the value on a message builder to prepare the message to be published from the Bolt. + * + * @param tuple + * @return + */ + default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) { + // Default implementation provided for backward compatibility + Message<byte[]> msg = toMessage(tuple); + msgBuilder.value(msg.getData()) + .properties(msg.getProperties()); + if (msg.hasKey()) { + msgBuilder.key(msg.getKey()); + } + return msgBuilder; + } + + + /** + * Declare the output schema for the bolt. + * + * @param declarer + */ + public void declareOutputFields(OutputFieldsDeclarer declarer); +} diff --git a/pulsar-storm/src/main/javadoc/overview.html b/pulsar-storm/src/main/javadoc/overview.html new file mode 100644 index 0000000..a1595eb --- /dev/null +++ b/pulsar-storm/src/main/javadoc/overview.html @@ -0,0 +1,29 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN"> +<HTML> + <HEAD> + <TITLE>Pulsar Storm API Overview</TITLE> + </HEAD> + <BODY> + The Pulsar Storm API is a proprietary messaging API. + </BODY> +</HTML> diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java new file mode 100644 index 0000000..e6cbc51 --- /dev/null +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; + +public class PulsarSpoutTest { + + @Test + public void testAckFailedMessage() throws Exception { + + PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); + conf.setServiceUrl("http://localhost:8080"); + conf.setSubscriptionName("sub1"); + conf.setTopic("persistent://prop/ns1/topic1"); + conf.setSubscriptionType(SubscriptionType.Exclusive); + conf.setMessageToValuesMapper(new MessageToValuesMapper() { + @Override + public Values toValues(Message<byte[]> msg) { + return null; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + }); + + ClientBuilder builder = spy(new ClientBuilderImpl()); + PulsarSpout spout = spy(new PulsarSpout(conf, builder)); + + Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), + new byte[0], Schema.BYTES, new MessageMetadata()); + Consumer<byte[]> consumer = mock(Consumer.class); + SpoutConsumer spoutConsumer = new SpoutConsumer(consumer); + CompletableFuture<Void> future = new CompletableFuture<>(); + future.complete(null); + doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId()); + Field consField = PulsarSpout.class.getDeclaredField("consumer"); + consField.setAccessible(true); + consField.set(spout, spoutConsumer); + + spout.fail(msg); + spout.ack(msg); + spout.emitNextAvailableTuple(); + verify(consumer, atLeast(1)).receive(anyInt(), any()); + } + + @Test + public void testPulsarTuple() throws Exception { + testPulsarSpout(true); + } + + @Test + public void testPulsarSpout() throws Exception { + testPulsarSpout(false); + } + + public void testPulsarSpout(boolean pulsarTuple) throws Exception { + PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); + conf.setServiceUrl("http://localhost:8080"); + conf.setSubscriptionName("sub1"); + conf.setTopic("persistent://prop/ns1/topic1"); + conf.setSubscriptionType(SubscriptionType.Exclusive); + conf.setSharedConsumerEnabled(true); + AtomicBoolean called = new AtomicBoolean(false); + conf.setMessageToValuesMapper(new MessageToValuesMapper() { + @Override + public Values toValues(Message<byte[]> msg) { + called.set(true); + if ("message to be dropped".equals(new String(msg.getData()))) { + return null; + } + String val = new String(msg.getData()); + if (val.startsWith("stream:")) { + String stream = val.split(":")[1]; + return new PulsarTuple(stream, val); + } + return new Values(val); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + }); + + String msgContent = pulsarTuple ? "stream:pstream" : "test"; + + ClientBuilder builder = spy(new ClientBuilderImpl()); + PulsarSpout spout = spy(new PulsarSpout(conf, builder)); + TopologyContext context = mock(TopologyContext.class); + final String componentId = "test-component-id"; + doReturn(componentId).when(context).getThisComponentId(); + SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + Map config = new HashMap<>(); + Field field = SharedPulsarClient.class.getDeclaredField("instances"); + field.setAccessible(true); + ConcurrentMap<String, SharedPulsarClient> instances = (ConcurrentMap<String, SharedPulsarClient>) field + .get(SharedPulsarClient.class); + + SharedPulsarClient client = mock(SharedPulsarClient.class); + Consumer<byte[]> consumer = mock(Consumer.class); + when(client.getSharedConsumer(any())).thenReturn(consumer); + instances.put(componentId, client); + + Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), + msgContent.getBytes(), Schema.BYTES, new MessageMetadata()); + when(consumer.receive(anyInt(), any())).thenReturn(msg); + + spout.open(config, context, collector); + spout.emitNextAvailableTuple(); + + assertTrue(called.get()); + verify(consumer, atLeast(1)).receive(anyInt(), any()); + ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class); + if (pulsarTuple) { + verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg)); + } else { + verify(collector, times(1)).emit(capt.capture(), eq(msg)); + } + Values vals = capt.getValue(); + assertEquals(msgContent, vals.get(0)); + } + +} diff --git a/tests/pom.xml b/tests/pom.xml index 0844bae..18ccb15 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -33,6 +33,7 @@ <name>Apache Pulsar Adapters :: Tests</name> <modules> <module>pulsar-kafka-compat-client-test</module> + <module>pulsar-storm-test</module> <module>pulsar-spark-test</module> </modules> <build> diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml new file mode 100644 index 0000000..3134328 --- /dev/null +++ b/tests/pulsar-storm-test/pom.xml @@ -0,0 +1,131 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.pulsar.tests</groupId> + <artifactId>adapters-tests-parent</artifactId> + <version>2.11.0-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-storm-test</artifactId> + <packaging>jar</packaging> + <name>Pulsar Storm adapter Tests</name> + + <dependencies> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-storm</artifactId> + <version>2.11.0-SNAPSHOT</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-server</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>buildtools</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-broker</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-broker</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>testmocks</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.asynchttpclient</groupId> + <artifactId>async-http-client</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> +</project> diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java new file mode 100644 index 0000000..4355ad6 --- /dev/null +++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.Collection; +import java.util.List; + +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.Tuple; + +public class MockOutputCollector implements IOutputCollector { + + private boolean acked = false; + private boolean failed = false; + private Throwable lastError = null; + private Tuple ackedTuple = null; + private int numTuplesAcked = 0; + + @Override + public void reportError(Throwable error) { + lastError = error; + } + + @Override + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } + + @Override + public void ack(Tuple input) { + acked = true; + failed = false; + ackedTuple = input; + ++numTuplesAcked; + } + + @Override + public void fail(Tuple input) { + failed = true; + acked = false; + } + + @Override + public void resetTimeout(Tuple tuple) { + + } + + public boolean acked() { + return acked; + } + + public boolean failed() { + return failed; + } + + public Throwable getLastError() { + return lastError; + } + + public Tuple getAckedTuple() { + return ackedTuple; + } + + public int getNumTuplesAcked() { + return numTuplesAcked; + } + + public void reset() { + acked = false; + failed = false; + lastError = null; + ackedTuple = null; + numTuplesAcked = 0; + } + + @Override + public void flush() { + // Nothing to flush from buffer + } + +} diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java new file mode 100644 index 0000000..98c8d20 --- /dev/null +++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.pulsar.client.api.Message; + +import org.apache.storm.spout.ISpoutOutputCollector; + +public class MockSpoutOutputCollector implements ISpoutOutputCollector { + + private boolean emitted = false; + private Message lastMessage = null; + private String data = null; + + @Override + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { + emitted = true; + data = (String) tuple.get(0); + lastMessage = (Message) messageId; + return new ArrayList<Integer>(); + } + + @Override + public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { + emitted = true; + data = (String) tuple.get(0); + lastMessage = (Message) messageId; + } + + @Override + public long getPendingCount() { + return 0; + } + + @Override + public void reportError(Throwable error) { + } + + public boolean emitted() { + return emitted; + } + + public String getTupleData() { + return data; + } + + public Message getLastMessage() { + return lastMessage; + } + + public void reset() { + emitted = false; + data = null; + lastMessage = null; + } + + @Override + public void flush() { + // Nothing to flush from buffer + } +} diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java new file mode 100644 index 0000000..b90e855 --- /dev/null +++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.fail; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.collections.Maps; + +public class PulsarBoltTest extends ProducerConsumerBase { + + private static final int NO_OF_RETRIES = 10; + + public String serviceUrl; + public final String topic = "persistent://my-property/my-ns/my-topic1"; + public final String subscriptionName = "my-subscriber-name"; + + protected PulsarBoltConfiguration pulsarBoltConf; + protected PulsarBolt bolt; + protected MockOutputCollector mockCollector; + protected Consumer consumer; + + @Override + @BeforeMethod + public void beforeMethod(Method m) throws Exception { + super.beforeMethod(m); + setup(); + } + + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + serviceUrl = pulsar.getWebServiceAddress(); + + pulsarBoltConf = new PulsarBoltConfiguration(); + pulsarBoltConf.setServiceUrl(serviceUrl); + pulsarBoltConf.setTopic(topic); + pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper); + pulsarBoltConf.setMetricsTimeIntervalInSecs(60); + bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); + mockCollector = new MockOutputCollector(); + OutputCollector collector = new OutputCollector(mockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName); + when(context.getThisTaskId()).thenReturn(0); + bolt.prepare(Maps.newHashMap(), context, collector); + consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe(); + } + + @AfterMethod + public void cleanup() throws Exception { + bolt.close(); + consumer.close(); + super.internalCleanup(); + } + + @SuppressWarnings("serial") + static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() { + + @Override + public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) { + if ("message to be dropped".equals(new String(tuple.getBinary(0)))) { + return null; + } + if ("throw exception".equals(new String(tuple.getBinary(0)))) { + throw new RuntimeException(); + } + return msgBuilder.value(tuple.getBinary(0)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + }; + + private Tuple getMockTuple(String msgContent) { + Tuple mockTuple = mock(Tuple.class); + when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes()); + when(mockTuple.getSourceComponent()).thenReturn(""); + when(mockTuple.getSourceStreamId()).thenReturn(""); + return mockTuple; + } + + @Test + public void testBasic() throws Exception { + String msgContent = "hello world"; + Tuple tuple = getMockTuple(msgContent); + bolt.execute(tuple); + for (int i = 0; i < NO_OF_RETRIES; i++) { + Thread.sleep(1000); + if (mockCollector.acked()) { + break; + } + } + Assert.assertTrue(mockCollector.acked()); + Assert.assertFalse(mockCollector.failed()); + Assert.assertNull(mockCollector.getLastError()); + Assert.assertEquals(tuple, mockCollector.getAckedTuple()); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledge(msg); + Assert.assertEquals(msgContent, new String(msg.getData())); + } + + @Test + public void testExecuteFailure() throws Exception { + String msgContent = "throw exception"; + Tuple tuple = getMockTuple(msgContent); + bolt.execute(tuple); + Assert.assertFalse(mockCollector.acked()); + Assert.assertTrue(mockCollector.failed()); + Assert.assertNotNull(mockCollector.getLastError()); + } + + @Test + public void testNoMessageSend() throws Exception { + String msgContent = "message to be dropped"; + Tuple tuple = getMockTuple(msgContent); + bolt.execute(tuple); + Assert.assertTrue(mockCollector.acked()); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNull(msg); + } + + @Test + public void testMetrics() throws Exception { + bolt.resetMetrics(); + String msgContent = "hello world"; + Tuple tuple = getMockTuple(msgContent); + for (int i = 0; i < 10; i++) { + bolt.execute(tuple); + } + for (int i = 0; i < NO_OF_RETRIES; i++) { + Thread.sleep(1000); + if (mockCollector.getNumTuplesAcked() == 10) { + break; + } + } + @SuppressWarnings("rawtypes") + Map metrics = (Map) bolt.getValueAndReset(); + Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10); + Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(), + 10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs()); + Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(), + ((double) msgContent.getBytes().length * 10) / pulsarBoltConf.getMetricsTimeIntervalInSecs()); + metrics = bolt.getMetrics(); + Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledge(msg); + } + } + + @Test + public void testSharedProducer() throws Exception { + TopicStats topicStats = admin.topics().getStats(topic); + Assert.assertEquals(topicStats.getPublishers().size(), 1); + PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); + MockOutputCollector otherMockCollector = new MockOutputCollector(); + OutputCollector collector = new OutputCollector(otherMockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName); + when(context.getThisTaskId()).thenReturn(1); + otherBolt.prepare(Maps.newHashMap(), context, collector); + + topicStats = admin.topics().getStats(topic); + Assert.assertEquals(topicStats.getPublishers().size(), 1); + + otherBolt.close(); + + topicStats = admin.topics().getStats(topic); + Assert.assertEquals(topicStats.getPublishers().size(), 1); + } + + @Test + public void testSerializability() throws Exception { + // test serializability with no auth + PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); + TestUtil.testSerializability(boltWithNoAuth); + } + + @Test + public void testFailedProducer() { + PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration(); + pulsarBoltConf.setServiceUrl(serviceUrl); + pulsarBoltConf.setTopic("persistent://invalid"); + pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper); + pulsarBoltConf.setMetricsTimeIntervalInSecs(60); + PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); + MockOutputCollector mockCollector = new MockOutputCollector(); + OutputCollector collector = new OutputCollector(mockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("new" + methodName); + when(context.getThisTaskId()).thenReturn(0); + try { + bolt.prepare(Maps.newHashMap(), context, collector); + fail("should have failed as producer creation failed"); + } catch (IllegalStateException ie) { + // Ok. + } + } +} diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java new file mode 100644 index 0000000..322e41b --- /dev/null +++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.collections.Maps; + +public class PulsarSpoutTest extends ProducerConsumerBase { + + public String serviceUrl; + public final String topic = "persistent://my-property/my-ns/my-topic1"; + public final String subscriptionName = "my-subscriber-name"; + + protected PulsarSpoutConfiguration pulsarSpoutConf; + protected PulsarSpout spout; + protected MockSpoutOutputCollector mockCollector; + protected Producer producer; + + @Override + @BeforeMethod + public void beforeMethod(Method m) throws Exception { + super.beforeMethod(m); + setup(); + } + + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + serviceUrl = pulsar.getWebServiceAddress(); + + pulsarSpoutConf = new PulsarSpoutConfiguration(); + pulsarSpoutConf.setServiceUrl(serviceUrl); + pulsarSpoutConf.setTopic(topic); + pulsarSpoutConf.setSubscriptionName(subscriptionName); + pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper); + pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS); + pulsarSpoutConf.setMaxFailedRetries(2); + pulsarSpoutConf.setSharedConsumerEnabled(true); + pulsarSpoutConf.setMetricsTimeIntervalInSecs(60); + pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared); + spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); + mockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("test-spout-" + methodName); + when(context.getThisTaskId()).thenReturn(0); + spout.open(Maps.newHashMap(), context, collector); + producer = pulsarClient.newProducer().topic(topic).create(); + } + + @AfterMethod + public void cleanup() throws Exception { + producer.close(); + spout.close(); + super.internalCleanup(); + } + + @SuppressWarnings("serial") + public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() { + + @Override + public Values toValues(Message msg) { + if ("message to be dropped".equals(new String(msg.getData()))) { + return null; + } + return new Values(new String(msg.getData())); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + }; + + @Test + public void testBasic() throws Exception { + String msgContent = "hello world"; + producer.send(msgContent.getBytes()); + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.ack(mockCollector.getLastMessage()); + } + + @Test + public void testRedeliverOnFail() throws Exception { + String msgContent = "hello world"; + producer.send(msgContent.getBytes()); + spout.nextTuple(); + spout.fail(mockCollector.getLastMessage()); + mockCollector.reset(); + Thread.sleep(150); + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.ack(mockCollector.getLastMessage()); + } + + @Test + public void testNoRedeliverOnAck() throws Exception { + String msgContent = "hello world"; + producer.send(msgContent.getBytes()); + spout.nextTuple(); + spout.ack(mockCollector.getLastMessage()); + mockCollector.reset(); + spout.nextTuple(); + assertFalse(mockCollector.emitted()); + assertNull(mockCollector.getTupleData()); + } + + @Test + public void testLimitedRedeliveriesOnTimeout() throws Exception { + String msgContent = "chuck norris"; + producer.send(msgContent.getBytes()); + + long startTime = System.currentTimeMillis(); + while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System + .currentTimeMillis()) { + mockCollector.reset(); + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.fail(mockCollector.getLastMessage()); + // wait to avoid backoff + Thread.sleep(500); + } + spout.nextTuple(); + spout.fail(mockCollector.getLastMessage()); + mockCollector.reset(); + Thread.sleep(500); + spout.nextTuple(); + assertFalse(mockCollector.emitted()); + assertNull(mockCollector.getTupleData()); + } + + @Test + public void testLimitedRedeliveriesOnCount() throws Exception { + String msgContent = "hello world"; + producer.send(msgContent.getBytes()); + + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.fail(mockCollector.getLastMessage()); + + mockCollector.reset(); + Thread.sleep(150); + + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.fail(mockCollector.getLastMessage()); + + mockCollector.reset(); + Thread.sleep(300); + + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.fail(mockCollector.getLastMessage()); + + mockCollector.reset(); + Thread.sleep(500); + spout.nextTuple(); + assertFalse(mockCollector.emitted()); + assertNull(mockCollector.getTupleData()); + } + + @Test + public void testBackoffOnRetry() throws Exception { + String msgContent = "chuck norris"; + producer.send(msgContent.getBytes()); + spout.nextTuple(); + spout.fail(mockCollector.getLastMessage()); + mockCollector.reset(); + // due to backoff we should not get the message again immediately + spout.nextTuple(); + assertFalse(mockCollector.emitted()); + assertNull(mockCollector.getTupleData()); + Thread.sleep(100); + spout.nextTuple(); + assertTrue(mockCollector.emitted()); + assertEquals(mockCollector.getTupleData(), msgContent); + spout.ack(mockCollector.getLastMessage()); + } + + @Test + public void testMessageDrop() throws Exception { + String msgContent = "message to be dropped"; + producer.send(msgContent.getBytes()); + spout.nextTuple(); + assertFalse(mockCollector.emitted()); + assertNull(mockCollector.getTupleData()); + } + + @SuppressWarnings({ "rawtypes" }) + @Test + public void testMetrics() throws Exception { + spout.resetMetrics(); + String msgContent = "hello world"; + producer.send(msgContent.getBytes()); + spout.nextTuple(); + Map metrics = spout.getMetrics(); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1); + assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(), + 1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs()); + assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(), + ((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs()); + spout.fail(mockCollector.getLastMessage()); + metrics = spout.getMetrics(); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0); + Thread.sleep(150); + spout.nextTuple(); + metrics = spout.getMetrics(); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1); + spout.ack(mockCollector.getLastMessage()); + metrics = (Map) spout.getValueAndReset(); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0); + assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0); + } + + @Test + public void testSharedConsumer() throws Exception { + TopicStats topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); + PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); + MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("test-spout-" + methodName); + when(context.getThisTaskId()).thenReturn(1); + otherSpout.open(Maps.newHashMap(), context, collector); + + topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); + + otherSpout.close(); + + topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); + } + + @Test + public void testNoSharedConsumer() throws Exception { + TopicStats topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); + pulsarSpoutConf.setSharedConsumerEnabled(false); + PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); + MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("test-spout-" + methodName); + when(context.getThisTaskId()).thenReturn(1); + otherSpout.open(Maps.newHashMap(), context, collector); + + topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 2); + + otherSpout.close(); + + topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); + } + + @Test + public void testSerializability() throws Exception { + // test serializability with no auth + PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); + TestUtil.testSerializability(spoutWithNoAuth); + } + + @Test + public void testFailedConsumer() { + PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration(); + pulsarSpoutConf.setServiceUrl(serviceUrl); + pulsarSpoutConf.setTopic("persistent://invalidTopic"); + pulsarSpoutConf.setSubscriptionName(subscriptionName); + pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper); + pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS); + pulsarSpoutConf.setMaxFailedRetries(2); + pulsarSpoutConf.setSharedConsumerEnabled(false); + pulsarSpoutConf.setMetricsTimeIntervalInSecs(60); + pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared); + PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); + MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); + TopologyContext context = mock(TopologyContext.class); + when(context.getThisComponentId()).thenReturn("new-test" + methodName); + when(context.getThisTaskId()).thenReturn(0); + try { + spout.open(Maps.newHashMap(), context, collector); + fail("should have failed as consumer creation failed"); + } catch (IllegalStateException e) { + // Ok + } + } +} diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java new file mode 100644 index 0000000..a71e088 --- /dev/null +++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; + +import org.testng.Assert; + +public class TestUtil { + + public static void testSerializability(Object object) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + oos.close(); + Assert.assertTrue(out.toByteArray().length > 0); + } +} diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java new file mode 100644 index 0000000..93404ea --- /dev/null +++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm.example; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.storm.MessageToValuesMapper; +import org.apache.pulsar.storm.PulsarBolt; +import org.apache.pulsar.storm.PulsarBoltConfiguration; +import org.apache.pulsar.storm.PulsarSpout; +import org.apache.pulsar.storm.PulsarSpoutConfiguration; +import org.apache.pulsar.storm.TupleToMessageMapper; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.task.IErrorReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StormExample { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class); + private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080"; + + @SuppressWarnings("serial") + static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() { + + @Override + public Values toValues(Message msg) { + return new Values(new String(msg.getData())); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // declare the output fields + declarer.declare(new Fields("string")); + } + }; + + @SuppressWarnings("serial") + static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() { + + @Override + public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) { + String receivedMessage = tuple.getString(0); + // message processing + String processedMsg = receivedMessage + "-processed"; + return msgBuilder.value(processedMsg.getBytes()); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // declare the output fields + } + }; + + public static void main(String[] args) throws Exception { + // String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication"; + // String authParams = "key1:val1,key2:val2"; + // clientConf.setAuthentication(authPluginClassName, authParams); + + String topic1 = "persistent://my-property/use/my-ns/my-topic1"; + String topic2 = "persistent://my-property/use/my-ns/my-topic2"; + String subscriptionName1 = "my-subscriber-name1"; + String subscriptionName2 = "my-subscriber-name2"; + + // create spout + PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration(); + spoutConf.setServiceUrl(serviceUrl); + spoutConf.setTopic(topic1); + spoutConf.setSubscriptionName(subscriptionName1); + spoutConf.setMessageToValuesMapper(messageToValuesMapper); + PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder()); + + // create bolt + PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration(); + boltConf.setServiceUrl(serviceUrl); + boltConf.setTopic(topic2); + boltConf.setTupleToMessageMapper(tupleToMessageMapper); + PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder()); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("testSpout", spout); + builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout"); + + Config conf = new Config(); + conf.setNumWorkers(2); + conf.setDebug(true); + conf.registerMetricsConsumer(PulsarMetricsConsumer.class); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(10000); + + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); + // create a consumer on topic2 to receive messages from the bolt when the processing is done + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe(); + // create a producer on topic1 to send messages that will be received by the spout + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic1).create(); + + for (int i = 0; i < 10; i++) { + String msg = "msg-" + i; + producer.send(msg.getBytes()); + LOG.info("Message {} sent", msg); + } + Message<byte[]> msg = null; + for (int i = 0; i < 10; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + LOG.info("Message {} received", new String(msg.getData())); + } + cluster.killTopology("test"); + cluster.shutdown(); + + } + + class PulsarMetricsConsumer implements IMetricsConsumer { + + @Override + public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, + IErrorReporter errorReporter) { + } + + @Override + public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { + // The collection will contain metrics for all the spouts/bolts that register the metrics in the topology. + // The name for the Pulsar Spout is "PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt + // is + // "PulsarBoltMetrics-{componentId}-{taskIndex}". + } + + @Override + public void cleanup() { + } + + } +}